美文网首页
Netty 基础核心组件

Netty 基础核心组件

作者: holmes000 | 来源:发表于2020-04-27 12:14 被阅读0次

    bossgroup和workgroup都是NioEeventLoopGroup

    1.pipeline中handler 主要用作对数据的处理,底层是双向链表;
    每个channel都有一个channelPipeline对应;
    每个pipeline都由 ChannelHandlerContext节点组成的双向链表构成,每个ChannelHandlerContext都对应一个handler;
    出站事件的执行顺序会从tail节点往前传递到最后一个handler;
    入站事件的执行顺序会从head节点往后传递到最后一个handler;
    常用方法:1addFirst() 2 addLast()


    image.png
    1. ChannelHandlerContext
      常用方法


      image.png

      3.ChannelOption


      image.png
      4.EventLoopGroup和实现子类NioEventLoopGroup
      image.png
      image.png

      WorkerEeventLoopGroup中EeventLoop默认数量是核数的两倍;
      WorkerEeventLoopGroup默认是按顺序next方式选择一个EventLoop来将SocketChannel注册到其selector中
      常用方法:
      构造方法
      shutdownGracefully() 断开连接,关闭线程
      5.Unpooled


      image.png
      image.png
      不需要使用flip进行反转;因为底层维护了readerindex和writerindex;
      writerindex到capacity是可写范围;
      readerindex到writerindex是可读范围;
      0-readerindex是已读范围;
      常用方法:
      copeidBuffer
    public class NettyServer {
        public static void main(String[] args) throws Exception {
            //处理连接
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            //处理业务
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                //配置
                serverBootstrap.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128) //任务阻塞队列
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //保持活跃连接
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        });
                System.out.println("服务端准备完毕。。。");
                //绑定端口,生成channelFuture
                ChannelFuture channelFuture = serverBootstrap.bind(6888).sync();
                //对关闭通道监听
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }
    }
    
    /**
     * 自定义Handler
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        /**
         * ChannelHandlerContext 上下文,可以获得pipeline,通道channel
         * msg 客户端发送的数据
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("服务器读取线程" + Thread.currentThread().getName());
            System.out.println("ser ctx.." + ctx);
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //写入buffer并刷新
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端", CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            System.out.println("核数:"+NettyRuntime.availableProcessors());
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(eventExecutors)
                        .channel(NioSocketChannel.class)//设置客户端通道的实现类
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler());
                            }
                        });
                System.out.println("客户端准备 ok。。");
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6888).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                eventExecutors.shutdownGracefully();
            }
        }
    }
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        /**
         * 当通道准备好就触发该方法
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client" + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server 喵", CharsetUtil.UTF_8));
        }
    
        /**
         * 当通道有读取事件,会触发
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("服务器回传消息:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Netty 基础核心组件

          本文链接:https://www.haomeiwen.com/subject/rwoewhtx.html