美文网首页
尚硅谷Netty 案例

尚硅谷Netty 案例

作者: 手扶拖拉机_6e4d | 来源:发表于2021-06-14 16:43 被阅读0次
    • 导入netty的library包


      image.png

    下载所有的netty包 io.netty:netty-all

    image.png
    选择4.1.20的版本: io.netty:netty-all:4.1.20.Final
    image.png
    • Netty demo1

    package cn.netty.project1;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.protobuf.ProtobufDecoder;
    
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            /*
                bossGroup和workerGroup含有的子线程(NioEventLoop)个数默认等于cp核数 * 2
             */
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try{
                // 创建服务器的启动对象,配置参数
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        .channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
    // 注意:可以通过一个集合管理SocketChannel,在推送消息时,可以将业务加入到各个channel对应的NIOEventLoop的taskQueue或者scheduleTaskQueue
                                System.out.println("客户端SocketChannel hashCode=" + socketChannel.hashCode());
                                socketChannel.pipeline().addLast(new NettyServerHandler());
                            }
                        });
    
                System.out.println("...服务器is ready...");
    
                //绑定一个端口并且同步生成了一个ChannelFuture对象
                // 启动服务器并启动监听
                ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
    
                // 对关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    package cn.netty.project1;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import java.nio.ByteBuffer;
    
    /**
     说明:
        自定义一个Handler需要继承netty规定好的某个HandlerAdapter适配器
        这时候我们自定义的Handler才能称之为handler
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 读取数据事件:可以读取客户端发送的消息
         * @param ctx : 上下文对象,含有管道pipeline,通道channel,地址
         * @param msg: 客户端发送的数据
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server ctx=" + ctx);
    
            /*
                将客户端发送的数据msg转成byteBuffer
             */
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送的消息=" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址=" + ctx.channel().remoteAddress()); // 拿到客户端地址
        }
    
        /**
         * 读取数据完毕
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写入缓存并刷新
            // 对发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hell 客户端", CharsetUtil.UTF_8));
        }
    
        /**
         * 处理异常,关闭通道
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    package cn.netty.project1;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            // 客户端需要一个事件循环组
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
            try{
                // 创建客户端启动对象
                Bootstrap bootstrap = new Bootstrap();
                // 设置相关参数
                bootstrap.group(nioEventLoopGroup) //设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new NettyClientHandler());
                            }
                        });
    
                System.out.println("客户端is ok...");
    
                //启动客户端连接服务端
                // 关于channel要分析,涉及到netty的异步模型
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    
                // 给关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } finally {
                nioEventLoopGroup.shutdownGracefully();
            }
        }
    }
    
    package cn.netty.project1;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufUtil;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.util.CharsetUtil;
    
    /**
        说明:
        1.Inbound代表入栈
     */
    public class NettyClientHandler extends  ChannelInboundHandlerAdapter{ //SimpleChannelInboundHandler<HttpContent>
    
        /**
         * 通道准备就绪就会调用该方法
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello server", CharsetUtil.UTF_8));
        }
    
        /*
            当通道有读取事件时就会触发
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
    
            System.out.println("服务器回复的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器地址:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    服务器端输出:
    ...服务器is ready...
    server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x9bb8b3b5, L:/127.0.0.1:6668 - R:/127.0.0.1:65274])
    客户端发送的消息=hello server
    客户端地址=/127.0.0.1:65274

    客户端输出:
    客户端is ok...
    服务器回复的消息:hell 客户端
    服务器地址:/127.0.0.1:6668

    • TaskQueue自定义任务
    • TaskQueue自定义任务

    任务队列中的task有3种经典使用场景
    1.用户程序自定义的普通任务


    image.png

    2.用户自定义定时任务
    3.非当前Reactor线程调用Channel的各种方法
    例如在推送系统的业务线程里面,根据用户的标识找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中后被异步消费

    demo:自定义普通任务

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 模拟耗时操作(异步执行->提交到该channel对应的NIOEventLoop的taskQueue中)
            Thread.sleep(10000); // 睡眠10秒
            ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
            System.out.println("go on...");
        }
    

    服务器端输出:
    服务器is ready...
    go on...

    客户端输出:
    客户端is ok...
    服务器回复的消息:first hello java!second hello 客户端
    服务器地址:/127.0.0.1:6668

    把耗时操作放入taskQueue
    会先输出“second hello 客户端”, 再输出“first hello java!”

     @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10000); // 睡眠10秒
                        ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!", CharsetUtil.UTF_8));
                    } catch (Exception e){
                        System.out.println("捕获异常=" + e.getMessage());
                    }
                }
            });
        }
    

    客户端输出:
    客户端is ok...
    服务器回复的消息:second hello 客户端
    服务器地址:/127.0.0.1:6668
    服务器回复的消息:first hello java!
    服务器地址:/127.0.0.1:6668

    2.自定义scheduleTask

     @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 用户自定义定时任务(提交到scheduleTaskQueue)
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5 * 1000); // 睡眠5秒
                        ctx.writeAndFlush(Unpooled.copiedBuffer("first hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
                    } catch (Exception e){
                        System.out.println("捕获异常=" + e.getMessage());
                    }
                }
            }, 5, TimeUnit.SECONDS);
    
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10 * 1000); // 睡眠5秒
                        ctx.writeAndFlush(Unpooled.copiedBuffer("third hello java!" + "当前时间=" + DateFormat.getTimeInstance().format(new Date()), CharsetUtil.UTF_8));
                    } catch (Exception e){
                        System.out.println("捕获异常=" + e.getMessage());
                    }
                }
            }, 10, TimeUnit.SECONDS);
    
            System.out.println("go on...");   // 在这一行打断点
        }
    

    ctx->pipeline->channel->eventLoop->scheduledTaskQueue


    image.png

    客户端输出:
    客户端is ok...
    服务器回复的消息:second hello 客户端当前时间=23:30:26
    服务器地址:/127.0.0.1:6668
    服务器回复的消息:first hello java!当前时间=23:30:36
    服务器地址:/127.0.0.1:6668
    服务器回复的消息:third hello java!当前时间=23:30:46
    服务器地址:/127.0.0.1:6668

    方案再说明:

    • 1.Netty抽象出两组线程池,BossGroup专门负责接受客户端连接,WorkerGroup专门负责网络读写操作
    • 2.NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道
    • 3.NioEventLoop内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
    • 4.NioEventLoopGroup下包含多个NioEventLoop
      每个NioEventLoop中包含一个Selector,一个taskQueue
      每个NioEventLoop的Selector上可以注册监听多个NioChannel
    • 5.每个NioChannel只会绑定在唯一的NioEventLoop上
    • 6.每个NioChannel都绑定有一个自己的ChannelPipeLine

    群聊系统
    编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通信(非阻塞)
    实现多人群聊
    服务器端:监测用户上线、离线,并实现消息转发功能
    客户端:通过channel可以无阻塞发送消息给其他所有用户,同时可以接受其他用户发送的消息(由服务器转发得到)


    image.png
    //GroupChatServer
    package cn.netty.groupchat2;
    
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class GroupChatServer {
    
        private int port;
    
        public GroupChatServer(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) throws Exception {
            new GroupChatServer(7011).run();
        }
    
        public void run() throws Exception{
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline =  socketChannel.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupChatServerHandler());
                            }
                        });
    
                System.out.println("netty 服务器启动");
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                //监听关闭
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    // GroupChatServerHandler
    package cn.netty.groupchat2;
    
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.group.ChannelGroup;
    import io.netty.channel.group.DefaultChannelGroup;
    import io.netty.util.concurrent.GlobalEventExecutor;
    
    import java.text.SimpleDateFormat;
    
    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    
        //使用一个hashmap 管理
        //public static Map<String, Channel> channels = new HashMap<String,Channel>();
    
        //定义一个channle 组,管理所有的channel
        //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
        private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
            //获取到当前channel
            Channel channel = channelHandlerContext.channel();
            //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
            channelGroup.forEach((ch)->{
                if (channel != ch){  //不是当前的channel,转发消息
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
                } else { //回显自己发送的消息给自己
                    ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
                }
            });
        }
    
    
        /**
         * handlerAdded 表示连接建立,一旦连接,第一个被执行,将当前channel 加入到  channelGroup
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
            channelGroup.add(channel);
        }
    
        /**
         * 表示channel 处于不活动状态, 提示 xx离线了
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
            System.out.println("channelGroup size" + channelGroup.size());
        }
    
        /**
         * 表示channel 处于活动状态, 提示 xx上线
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(ctx.channel().remoteAddress() + " 上线了~");
        }
    
        /**
         * 断开连接, 将xx客户离开信息推送给当前在线的客户
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channel.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
            System.out.println("channelGroup size" + channelGroup.size());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    //GroupChatClient
    package cn.netty.groupchat2;
    
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Scanner;
    
    public class GroupChatClient {
        private int port ;
        private String host ;
    
        public GroupChatClient(int port, String host) {
            this.port = port;
            this.host = host;
        }
    
        public void run() throws Exception{
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            try (Scanner scanner = new Scanner(System.in)){
                Bootstrap bootstrap = new Bootstrap().group(group)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast(new GroupChatClientHandler());
                            }
                        });
    
                ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
                Channel channel = channelFuture.channel();
                System.out.println("-------" + channel.localAddress()+ "--------");
    
                while (scanner.hasNextLine()){
                    String msg = scanner.nextLine();
                    channel.writeAndFlush(msg + "\r\n");
                }
    
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception{
            new GroupChatClient( 7011, "127.0.0.1").run();
        }
    }
    
    // GroupChatClientHandler
    package cn.netty.groupchat2;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
            System.out.println(msg.trim());
        }
    }
    
    

    相关文章

      网友评论

          本文标题:尚硅谷Netty 案例

          本文链接:https://www.haomeiwen.com/subject/izefsltx.html