美文网首页
Netty官方样例(Echo ‐ the very basic

Netty官方样例(Echo ‐ the very basic

作者: 东南枝下 | 来源:发表于2022-02-08 14:55 被阅读0次
    • 代码来源于netty官方样例

    netty响应式编程模型 Reactor pattern

    图片.png

    服务端代码

    这个样例的关键在于服务端,服务端需要建立两个线程组bossGroupworkerGroup分别用于处理连接请求和真正的客户端业务,对应的模型如上图

    服务端的关键代码分别是
    1. ServerBootstrap bootstrap = new ServerBootstrap();创建服务器端的启动对象
    2. bootstrap.group(bossGroup, workerGroup) ...链式的配置参数
    3. ChannelFuture f = bootstrap.bind(PORT).sync(); 绑定端口,启动

    服务端需要一个ChannelHandler来专注于业务逻辑的实现,只需要继承官方提供的标准Handler,重写相应的方法,实现具体的逻辑即可。

    此处修改官方样例,在与客户端连接成功时打印一条消息,在收到消息时将消息打印出来,并发送一条确认消息给客户端

    package example.echo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.util.SelfSignedCertificate;
    
    /**
     * @author netty
     */
    public final class EchoServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the server.
            // 创建两个线程组bossGroup和workerGroup,bossGroup只是处理连接请求,真正和客户端业务处理的是workerGroup
            // NioEventLoopGroup的子线程数默认是CPU核数的2倍
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                // 关键代码 1:创建服务器端的启动对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 关键代码 2:配置参数,链式
                // 设置两个线程组
                bootstrap.group(bossGroup, workerGroup)
                        // 使用NioServerSocketChannel作为服务器的通道实现
                        .channel(NioServerSocketChannel.class)
                        // 初始化服务器连接队列的大小,服务器端处理客户端的连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
                        // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                if (sslCtx != null) {
                                    p.addLast(sslCtx.newHandler(ch.alloc()));
                                }
                                //p.addLast(new LoggingHandler(LogLevel.INFO));
                                // 对workerGroup的SocketChannel绑定处理器
                                p.addLast(serverHandler);
                            }
                        });
    
                // Start the server.
                // 关键代码 3:绑定端口,启动
                ChannelFuture f = bootstrap.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    package example.echo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author netty
     */
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 客户端连接服务器就会触发该方法
         *
         * @param ctx 上下文,内含通道channel,管道pipeline
         * @throws Exception 异常
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // super.channelActive(ctx);
            Channel channel = ctx.channel();
            System.out.println("客户端连接通道建立完成");
        }
    
        /**
         * 读取客户端发送的数据
         *
         * @param ctx 上下文
         * @param msg 客户端发送的数据
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            // ctx.write(msg);
            ByteBuf buf = (ByteBuf) msg;
            String msgStr = buf.toString(CharsetUtil.UTF_8);
            Channel channel = ctx.channel();
            System.out.println("收到客户端发送的消息---->" + msgStr);
            channel.writeAndFlush(Unpooled.copiedBuffer("------>服务端回复,已经收到消息:" + msgStr, CharsetUtil.UTF_8));
        }
    
        /**
         * 数据读取完毕处理方法
         *
         * @param ctx 上下文
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            System.out.println("消息读取完毕........V");
            ctx.flush();
        }
    
        /**
         * 引发异常时处理
         *
         * @param ctx   上下文
         * @param cause 错误
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.out.println("异常发生........X");
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    

    客户端

    客户端写法和服务端差不多,不同的是启动的时候使用的是connect ->ChannelFuture channelFuture = b.connect(HOST, PORT).sync();

    在连接后写一个Scanner扫描控制台输入,来给客户端发消息

    调整客户端的Handler,在channelActive中给服务端发送一条连接成功的消息,在channelRead中将收到的消息打印出来,并注释原本代码,免得形成了客户端发送服务端回复的死循环

    package example.echo;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
    import io.netty.util.CharsetUtil;
    
    import java.util.Scanner;
    
    
    /**
     * @author netty
     */
    public final class EchoClient {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.git
            final SslContext sslCtx;
            if (SSL) {
                sslCtx = SslContextBuilder.forClient()
                        .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                if (sslCtx != null) {
                                    p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                                }
                                //p.addLast(new LoggingHandler(LogLevel.INFO));
                                p.addLast(new EchoClientHandler());
                            }
                        });
    
                // Start the client.
                ChannelFuture channelFuture = b.connect(HOST, PORT).sync();
    
                // 客户端连接后会返回一个channel,可以用此与服务端通信
                Channel channel = channelFuture.channel();
                System.out.println("=================" + channel.localAddress() + "===============");
                // 扫描控制台输入
                Scanner scanner = new Scanner(System.in);
                while (scanner.hasNextLine()) {
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
                }
                // Wait until the connection is closed.
                channelFuture.channel().closeFuture().sync();
    
            } finally {
                // Shut down the event loop to terminate all threads.
                group.shutdownGracefully();
            }
        }
    }
    
    package example.echo;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    
    //    private final ByteBuf firstMessage;
    
    //    /**
    //     * Creates a client-side handler.
    //     */
    //    public EchoClientHandler() {
    //        firstMessage = Unpooled.buffer(EchoClient.SIZE);
    //        for (int i = 0; i < firstMessage.capacity(); i++) {
    //            firstMessage.writeByte((byte) i);
    //        }
    //    }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            // 连接成功后给客户端发送一条消息
            ByteBuf firstMessage = Unpooled.copiedBuffer("客户端连接成功发送的第一条消息......OK", CharsetUtil.UTF_8);
            ctx.writeAndFlush(firstMessage);
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf bf = (ByteBuf) msg;
            System.out.println(bf.toString(CharsetUtil.UTF_8));
    //        ctx.write(msg);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    实现效果

    • 客户端
    =================/127.0.0.1:54400===============
    ------>服务端回复,已经收到消息:客户端连接成功发送的第一条消息......OK
    hello
    ------>服务端回复,已经收到消息:hello
    你好
    ------>服务端回复,已经收到消息:你好
    
    • 服务端
    客户端连接通道建立完成
    收到客户端发送的消息---->客户端连接成功发送的第一条消息......OK
    消息读取完毕........V
    收到客户端发送的消息---->hello
    消息读取完毕........V
    收到客户端发送的消息---->你好
    消息读取完毕........V
    

    参考学习: https://www.bilibili.com/video/BV1fA41157Ht?share_source=copy_web
    netty官方文档:https://netty.io/wiki/index.html
    netty官方样例:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html

    相关文章

      网友评论

          本文标题:Netty官方样例(Echo ‐ the very basic

          本文链接:https://www.haomeiwen.com/subject/rommkrtx.html