美文网首页Netty框架源码分析
7.Netty框架-Netty编程模板(编程步骤)

7.Netty框架-Netty编程模板(编程步骤)

作者: 还算年轻 | 来源:发表于2020-08-10 09:52 被阅读0次

    一、Netty编程模板

    1、Netty编程步骤:

    netty客户端/netty服务器端,代码流程:
    
    1.创建客户端类/服务器端(c/s)类
    2.创造c/s构造方法,主要传入参数为地址和端口
    3.创建c/s的引导器Bootstrap(ServerBootstrap)
    4.创建c/s的生命周期组EventLoopGroup为NioEventLoopGroup()
    5.配置装载引导器,把生命周期组EventLoopGroup装载到引导器,配置地址和端口,配置通道类型,配置操作句柄Handler
    4.填写Handler,初始化装载channel,并对channel配置一个或多个pipeline,即ChannelInboundHandler(ChannelOutboundHandler),相当于注册相应的逻辑程序(handler)
    5.创建ChannelFuture启动//引导启动连接绑定(connect/bind)阻塞
    6.阻塞关闭关闭占位符ChannelFuture
    7.关闭整个线程组
    
    客户端应用/服务器端应用代码编写流程
    
    编写操作继承ChannelInboundHandler(ChannelOutboundHandler)
    客户端重写channelActive,向服务端发送数据,服务端重写channelRead读取客户端的数据并向客户端发送数据
    客户端channelRead0读取服务端的数据
    重写异常处理
    

    2、Netty编程代码模板:

    <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-all</artifactId>
          <version>4.1.6.Final</version>
    </dependency>
    
    //构造服务端方法
    public class SendServer {
        private final int port;
            public SendServer(int port) {
            this.port=port;
        }
      //服务器端启动
      private void start() throws Exception {
            EventLoopGroup group = null;
            try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();//创建server引导
            group = new NioEventLoopGroup();//reactor线程组
            serverBootstrap.group(group)
                .channel(NioServerSocketChannel.class)//设置channel类型
                .localAddress("localhost",port)//绑定端口
        //reacor handle
                .childHandler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
                    @Override
                    protected void initChannel(
                            io.netty.channel.socket.SocketChannel ch)
                            throws Exception {
                        ch.pipeline().addFirst((ChannelHandler) new SendServerForWork());
                    }
                });
            ChannelFuture sync = serverBootstrap.bind().sync();//异步绑定
            System.out.println("开始监听,地址端口为:" + sync.channel());
            sync.channel().closeFuture().sync();
            }finally{
            group.shutdownGracefully().sync();
            }
        }
        public static void main(String[] args) throws Exception {
            System.out.println("server start.......");
            new SendServer(20000).start();
        }
     }
    
    
    
    public class SendServerForWork extends ChannelInboundHandlerAdapter{
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            //读取数据
            System.out.println("服务器读取数据。。");
            ByteBuf buf = (ByteBuf)msg;
            byte[] bytes = new byte[buf.readableBytes()];
            buf.readBytes(bytes);
            String string = new String(bytes, "UTF-8");
            System.out.println("读取客户端的数据为:"+ string);
            //向客户端发送数据
            System.out.println("服务器向客户端发送数据...");
            String currenttime = new Date(System.currentTimeMillis()).toString();
            ByteBuf copiedBuffer = Unpooled.copiedBuffer(currenttime.getBytes());
            ctx.write(copiedBuffer);
            
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("服务器读写数据完毕。。。");
            ctx.flush();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            System.out.println("服务器异常处理。。。");
            ctx.close();
        }
    }
    
    
     
    public class SendClinet {
        private final String host;
        private final int port;
        
        //构造客户端方法
        public SendClinet(String host,int port){
            this.host=host;
            this.port=port;     
        }
        //客户端启动
        public void start() throws InterruptedException {
            //创建生命周期组,EventLoopGroup包含一个或多个EventLoop,而EventLoop在一个生命周期内只能绑定一个Thread
            //每一个EventLoop的I/O事件都是由这个Thread处理,一个channel在生命周期内只能对应一个EventLoop,
            //但一个EventLoop可以被分给一个或多个channel,因此channel和thread是对应的
            EventLoopGroup group = null;
            try {
                Bootstrap bootstrap = new Bootstrap();//创建一个引导启动类
                group = new NioEventLoopGroup();
                bootstrap.group(group)//把事件生命周期组EventLoopGroup注册引导启动类中去启动
                    .channel(NioSocketChannel.class)//注册channel类型为NioSocketChannel,这个类型还有NioSctpChannel,NioDatagramChannel,LocalServerChannel,EmbeddedChannel
                    .remoteAddress(new InetSocketAddress(host, port))//注册连接的服务器地址端口
                    //注册事件操作句柄,使用childHandler时候不可以,所以只能用handler代替了
                    .handler(new ChannelInitializer<io.netty.channel.socket.SocketChannel>() {
                        @Override
                        protected void initChannel(//初始化通道
                                io.netty.channel.socket.SocketChannel ch)
                                throws Exception {
                            //一个SocketChannel可以添加多个ChannelHandler,可以多加addLast,
                            //这个ChannelHandler,有两种In和Out即ChannelInboundHandler,ChannelOutboundHandler
                            //pipeline 在处理In和Out顺序是,in是从头部开始,out是尾部开始,例如 in1,out1,out2,in2,运行结果是in1->in2,out2->out1
                                                    //ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler,
                                                    //ctx.write()方法执行后,需要调用flush()方法才能令它立即执行,
                                                    //pipeline中outhandler不能放在最后,否则不生效
                            ch.pipeline().addLast((ChannelHandler) new SendClientForWork());
                            
                        }
                    });
    // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
                    ChannelFuture sync=bootstrap.connect().sync();//引导启动连接,ChannelFuture为将要执行操作的占位符
                    //sync.channel().close().sync();
                    sync.channel().closeFuture().sync();//关闭占位符,而不关闭整个通道,close是关闭整个客户端
                }finally {
                group.shutdownGracefully().sync();//关闭整个线程组
            }                
        }   
        public static void main(String[] args) throws Exception {
            System.out.println("client start....");
            new SendClinet("localhost",20000).start();//指定连接服务器的地址和端口
        }
    }
    
    
    //继承ChannelInboundHandler
    public class SendClientForWork extends SimpleChannelInboundHandler<ByteBuf>{
        // 客户端连接服务器后被调用,并向服务器发送数据
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("客户端连接服务器,并开始发送数据。。。");
            byte[] req = "查询时间".getBytes();//查询命令设置,并序列化
            ByteBuf buffer = Unpooled.buffer(req.length);
            buffer.writeBytes(req);
            ctx.writeAndFlush(buffer);//发送数据        
        };
        // 从服务器接收到数据后调用,处理收到的数据逻辑   
        @Override
        protected void channelRead0(ChannelHandlerContext arg0, ByteBuf arg1)
                throws Exception {
            System.out.println("客户端读取服务端的数据...");
            ByteBuf msg = arg1;
            byte[] bytes=new byte[msg.readableBytes()];
            msg.readBytes(bytes);//将msg消息读取到bytes中,并反序列化收到的消息
            String string = new String(bytes,"UTF-8");
            System.out.println("读取的服务端的数据为:" + string);
        }
        // 当连接发生异常时被调用
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            System.out.println("客户端异常处理。。。");
            ctx.close();//关闭上下文,释放资源
        }
    }
    

    相关文章

      网友评论

        本文标题:7.Netty框架-Netty编程模板(编程步骤)

        本文链接:https://www.haomeiwen.com/subject/iqvyrktx.html