netty(八)初识Netty-channel

作者: 我犟不过你 | 来源:发表于2021-11-10 11:16 被阅读0次

    一、channel

    1.1 channel的主要方法

    1)close() 可以用来关闭 channel
    2)closeFuture() 用来处理 channel 的关闭,有如下两种方式

    sync 方法作用是同步等待 channel 关闭
    而 addListener 方法是异步等待 channel 关闭

    3)pipeline() 方法添加处理器
    4)write() 方法将数据写入
    5)writeAndFlush() 方法将数据写入并刷出

    1.2 什么是channelFuture?

    我们看下面一段客户端代码,也是前面文章使用的代码

        public static void main(String[] args) throws InterruptedException {
            Channel channel = new Bootstrap()
                    .group(new NioEventLoopGroup(1))
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            System.out.println("init...");
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .channel(NioSocketChannel.class)
                     .connect("localhost", 8080)
                    .sync()
                    .channel();
    
            channel.writeAndFlush("ccc");
            Thread.sleep(1000);
            channel.writeAndFlush("ccc");
        }
    

    主要看到调用connect()方法除,此处返回的其实是一个ChannelFuture 对象,通过channel()方法可以获得channel对象。

        public ChannelFuture connect(String inetHost, int inetPort) {
            return this.connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
        }
    
    public interface ChannelFuture extends Future<Void> {
        Channel channel();
        ... ...
    }
    

    需要注意的是,这个connect方法是一个异步的方法,调用过后实际并没有建立连接,所以我们得到的ChannelFuture对象中并不能立刻获得正确的channel。

    通过下面的例子看一下现象,启动一个服务端,端口8080,这里不提供服务端代码了,使用前面的就行。启动我们写好的测试客户端代码:

    public class ChannelFutureTest {
    
        public static void main(String[] args) throws Exception {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) {
    
                        }
                    })
                    .connect("localhost", 8080);
    
            System.out.println(channelFuture.channel());
    
            //同步等待连接
            channelFuture.sync();
            System.out.println(channelFuture.channel());
        }
    }
    

    结果:

    [id: 0x7aa12c28]
    [id: 0x7aa12c28, L:/127.0.0.1:52286 - R:localhost/127.0.0.1:8080]
    

    如上结果所示,首先打印只有一个id地址,当执行sync()方法,此处会同步阻塞等待连接,如果一直无法连接会抛出超时异常。当成功建立连接后,会继续执行,并打印出如上结果最后一行的内容。

    除使用sync()这个同步方法以外,还有一种异步的方式:

            // 异步
            channelFuture.addListener((ChannelFutureListener) future -> {
                System.out.println(future.channel());
            });
    

    结果:

    [id: 0xd9d474f1]
    [id: 0xd9d474f1, L:/127.0.0.1:59564 - R:localhost/127.0.0.1:8080]
    

    ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),这里是一个函数式接口调用。

    1.3 什么是CloseFuture?

    我们同通过一段代码演示一下,此处涉及到channel的close方法,和CloseFuture的close方法。关闭是为了释放占用的资源。

    看如下一段代码:

    public class ChannelFutureTest {
    
        public static void main(String[] args) throws Exception {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect("localhost", 8080);
    
            // 同步等待连接
            Channel channel = channelFuture.sync().channel();
    
            new Thread(()->{
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if ("q".equals(line)) {
                        System.out.println("关闭channel");
                        // close 异步操作 1s 之后
                        channel.close();
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            }, "input").start();
    
            System.out.println("处理channel关闭后的操作");
    }
    

    如上代码所示,我们的客户端,允许用户手动输入q进行关闭程序,否则就发送内容到服务端。

    但是按照如上代码直接运行客户端,发现System.out.println("处理channel关闭后的操作");这条命令直接执行了,因为我们的主要关闭业务逻辑是启用子线程实现的。

    也就是说我们在子线程,即channel还没有关闭就执行了代码,这样可能导致我们的业务逻辑存在问题。

    所以我们需要在channel关闭后才进行打印,真实场景中就是channel关闭后进行剩余业务操作。

    我们需要在 System.out.println("处理channel关闭后的操作"); 之前增加以下的代码:

    // 获取closefuture
    ChannelFuture closeFuture = channel.closeFuture();
     //同步阻塞
    closeFuture.sync();
    

    上述代码会获取到一个closeFuture对象,sync方法会同步阻塞在此,直到子线程当中的channel真正关闭了,才会继续执行代码。

    输入1、2、3、q,直接看结果:

    1
    2
    3
    q
    关闭channel
    处理channel关闭后的操作
    

    与channelFuture相同,closeFuture除了有sync方法进行同步阻塞,仍然也可以使用异步方式进行监听channel是否关闭的状态。

    将 System.out.println("处理channel关闭后的操作"); 放在以下代码:

            closeFuture.addListener((ChannelFutureListener) future -> System.out.println("处理channel关闭后的操作"); ); 
    

    输入1、2、3、q,看结果:

    1
    2
    3
    q
    关闭channel
    处理channel关闭后的操作
    

    提供一个NioEventLoopGroup专门用于关闭。

    NioEventLoopGroup group = new NioEventLoopGroup();
    

    通过上面的代码我们已经能够成功监测到channel的关闭了,但是相信实践过朋友们会发现我们channel虽然关闭了,但是整个程序仍然在运行,整体的资源没有做到全部释放,这是应为EventLoopGroup当中的线程没有停止,这里需要引入一个方法:

    shutdownGracefully()

    这个方式是EventLoopGroup当中的方法。我们需要做以下操作,为了大家看,我把所有的客户端内容全放在以下代码中了:

    public class ChannelFutureTest {
    
        public static void main(String[] args) throws Exception {
            // 将group提出来,不能匿名方式,为了后面调动shutdownGracefully()方法
            NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect("localhost", 8080);
    
            // 同步等待连接
            Channel channel = channelFuture.sync().channel();
    
            new Thread(() -> {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if ("q".equals(line)) {
                        System.out.println("关闭channel");
                        // close 异步操作 1s 之后
                        channel.close();
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            }, "input").start();
    
    
            // 处理channel关闭后的操作
            // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
            ChannelFuture closeFuture = channel.closeFuture();
    
            //同步
            //closeFuture.sync();
    
            //异步 - EventLoopGroup线程未关闭
            //closeFuture.addListener((ChannelFutureListener) future -> System.out.println("处理channel关闭后的操作"));
    
            //异步 - EventLoopGroup线程优雅关闭
            closeFuture.addListener((ChannelFutureListener) future -> group.shutdownGracefully());
        }
    }
    

    结果:

    1
    2
    3
    q
    关闭channel
    
    Process finished with exit code 0
    

    关于channel的介绍就这么多,有帮助的话点个赞吧。

    相关文章

      网友评论

        本文标题:netty(八)初识Netty-channel

        本文链接:https://www.haomeiwen.com/subject/dnebzltx.html