美文网首页Magic Netty
Netty分享之Channel

Netty分享之Channel

作者: 逅弈 | 来源:发表于2018-02-06 23:35 被阅读15次

    gris原创 转载请注明原创出处,谢谢!

    我们知道netty中的ByteBuf组件主要是为了替代NIO中的ByteBuffer类而重新设计的,与此类似的还有一个组件:Channel。
    Channel组件是为了替代Socket而重新设计的,Channel的接口所提供给我们的API,大大地降低了直接使用Socket时的复杂性。本篇文章,就让我们来了解下netty中的Channel组件。

    什么是Channel

    Netty中对Channel是这样定义的:

    A nexus to a network socket or a component which is capable of I/O
    operations such as read, write, connect, and bind.

    简单来说Channel是与网络套接字相关的,一个具有诸如:读、写、连接、绑定等能力的组件。

    通过Channel我们可以得到以下的数据:

    • 一个Channel的当前状态,比如该通道是打开状态还是连接的状态
    • 可以得到该Channel的ChannelConfig,并可以得到相关的配置信息
    • 该Channel所支持的IO操作:读、写、连接或者绑定
    • 与该Channel绑定的用来处理该Channel上所有的事件和请求的ChannelPipeline

    我们知道netty高性能的关键之一是他的IO模型,对于netty来说所有的IO操作都是异步的,而这都得益于Channel中所有方法的执行都是异步的。一个IO请求来了之后是立即返回的并且不保证调用结束时改IO操作已经完成了,取而代之的是:一个IO请求来了之后,请求者会得到一个ChannelFuture的实例,该实例会在你的IO操作真正完成后(包括成功,失败,被取消)通知你,然后你就可以得到具体的IO操作的结果。
    Channel是有层级关系的,一个Channel会有一个对应的parent,该parent也是一个Channel。并且根据Channel的创建不同,他的parent也会不一样。例如,一个SocketChannel连接上ServerSocketChannel之后,该SocketChannel的parent就会是该ServerSocketChannel。层次结构的语义取决于Channel使用了何种传输实现方式。比如我们可以重新定义一种Channel的实现,在该Channel上创建一个共享此通道的用来连接SSH的子通道。

    PS: 需要特别注意的是,当你在一个Channel上完成了所有的操作之后,记得要调用close()方法来释放资源,这是非常有必要的。

    Channel的接口定义

    下面让我们来看下Channel接口中一些重要的方法定义:

    ChannelId id();
    EventLoop eventLoop();
    Channel parent();
    ChannelConfig config();
    
    boolean isOpen();
    boolean isRegistered();
    boolean isActive();
    boolean isWritable();
    
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
    
    Channel read();
    Channel flush();
    
    // 以下方法继承自父接口
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    
    • id()方法将返回一个全局唯一的ChannelId,该ChannelId由以下规则构成:
      1:服务器的Mac地址
      2:当前的进程ID
      3:System.currentTimeMillis()
      4:System.nanoTime()
      5:一个随机的32位整数
      6:一个顺序增长的32位整数
    • eventLoop()方法将返回分配给该Channel的EventLoop,一个EventLoop就是一个线程,用来处理连接的生命周期中所发生的事件
    • parent()方法将返回该Channel的父Channel
    • config()方法将返回该Channel的ChannelConfig,ChannelConfig中包含了该Channel的所有配置设置,并且支持热更新
    • pipeline()方法将返回该Channel所对应的ChannelPipeline
    • alloc()方法将返回分配给该Channel的ByteBufAllocator,可以用来分配ByteBuf

    除了以上这些方法外,Channel还继承了ChannelOutboundInvoker接口。该接口中的大部分方法返回值都是ChannelFuture(还有一部分方法的返回值是ChannelPromise),这也证明了Channel的所有操作都是异步的说法。因为一个ChannelFuture就是一个异步方法执行结果的占位符。这个方法什么时候被执行可能取决于若干的因素,因此无法准确的预测。但是有一点可以肯定的是他将会被执行,并且所有关联在同一个Channel上的操作都被保证将以他们被调用的顺序被执行。

    Channel的使用示例

    Channel被设计为线程安全的,因此我们可以将得到的Channel的引用存储起来,当需要往远程节点写数据时,从存储的地方获取到该Channel然后直接使用就可以了。下面列举一个从多个线程中往一个Channel中写数据的例子,代码如下:

    public void writingToChannelFromManyThreads() {
        // 从其他地方得到一个Channel
        final Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        //创建持有要写数据的ByteBuf
        final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
        //创建将数据写到Channel的Runnable
        Runnable writer = new Runnable() {
            @Override
            public void run() {
                channel.write(buf.duplicate());
            }
        };
        //获取到线程池Executor的引用
        Executor executor = Executors.newCachedThreadPool();
    
        //递交写任务给线程池以便在某个线程中执行
        // write in one thread
        executor.execute(writer);
    
        //递交另一个写任务以便在另一个线程中执行
        // write in another thread
        executor.execute(writer);
        //省略其他的操作...
    }
    

    从上面的例子中我们可以看出,对于多线程的操作而言,Channel天生是线程安全的。不必担心写的数据会发出错乱或者其他的问题。

    另外,Channel的所有IO操作都是异步的,通过为ChannelFuture添加一个ChannelFutureListener,便可以在该操作完成之后得到通知,一个示例代码如下:

    public void writingToChannel() {
        Channel channel = CHANNEL_FROM_SOMEWHERE; // Get the channel reference from somewhere
        //创建持有要写数据的 ByteBuf
        ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
        //执行writeAndFlush会理解得到一个ChannelFuture
        ChannelFuture future = channel.writeAndFlush(buf);
        //添加ChannelFutureListener以便在写操作完成后接收通知
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                //写操作完成,并且没有错误发生
                if (future.isSuccess()) {
                    System.out.println("Write successful");
                } else {
                    //记录错误
                    System.err.println("Write error");
                    future.cause().printStackTrace();
                }
            }
        });
        //Channel写完数据后会紧接着执行后面的代码,并不会阻塞后面的代码
        doOtherThings();
    }
    

    总结

    以上,我们就对Channel有了一个大概的了解,总结一下可以得到以下几点结论:

    • 一个Channel就相当于一个Socket
    • Channel拥有以下一些IO操作的能力,包括连接,绑定,断开,读写数据等
    • Channel中的所有方法都是异步的,方法执行的返回值是一个ChannelFuture
    • ChannelFuture可以通过添加Listener的方式在方法执行完毕的时候通知方法的调用者
    • Channel被设计为线程安全的,我们不必担心线程安全的问题,并且可以将Channel的引用保存起来,以便后面使用。

    我是逅弈,如果文章对您有帮助,欢迎您点赞加关注,并欢迎您关注我的公众号:

    欢迎关注微信公众号

    相关文章

      网友评论

        本文标题:Netty分享之Channel

        本文链接:https://www.haomeiwen.com/subject/dzwnzxtx.html