美文网首页
Netty源码分析(三) ChannelPipeline

Netty源码分析(三) ChannelPipeline

作者: skyguard | 来源:发表于2018-11-07 14:09 被阅读0次

    下面我们来分析Netty里的又一个重要的组件ChannelPipeline。
    每个Channel有一个pipeline,它是一个双向链表,有一个head和tail,netty的事件就是通过ChannelPipeline进行传递的。每个channel内部都会持有一个ChannelPipeline对象pipeline. pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。


    ChannelPipeline

    当channel完成register、active、read等操作时,会触发pipeline的相应方法。 1、当channel注册到selector时,触发pipeline的fireChannelRegistered方法。 2、当channel的socket绑定完成时,触发pipeline的fireChannelActive方法。 3、当有客户端请求时,触发pipeline的fireChannelRead方法。 4、当本次客户端请求,pipeline执行完fireChannelRead,触发pipeline的fireChannelReadComplete方法。
    下面来看看ChannelPipeline的结构


    DefaultChannelPipeline
    ChannelPipeline是一个接口,默认的实现是DefaultChannelPipeline,每个ChannelPipeline上有ChannelHandlerContext,而ChannelHandlerContext包含了ChannelHandler,每个ChannelHandler负责处理具体的业务逻辑。下面来看看DefaultChannelPipeline有哪些属性
    /**
     * Head 节点
     */
    final AbstractChannelHandlerContext head;
    /**
     * Tail 节点
     */
    final AbstractChannelHandlerContext tail;
    
    /**
     * 所属 Channel 对象
     */
    private final Channel channel;
    /**
     * 成功的 Promise 对象
     */
    private final ChannelFuture succeededFuture;
    

    1、TailContext实现了ChannelOutboundHandler接口。 2、HeadContext实现了ChannelInboundHandler接口。 3、head和tail形成了一个链表。

    对于Inbound的操作,当channel注册到selector时,触发pipeline的fireChannelRegistered,从head开始遍历,找到实现了ChannelInboundHandler接口的handler,并执行其fireChannelRegistered方法。

    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    

    服务启动过程中,ServerBootstrap在init方法中,会给ServerSocketChannel的pipeline添加ChannelInitializer对象,其中ChannelInitializer继承ChannelInboundHandlerAdapter,并实现了ChannelInboundHandler接口,所以当ServerSocketChannel注册到selector之后,会触发其channelRegistered方法。

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance. 解决并发问题
            try {
                // 初始化通道
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // 发生异常时,执行异常处理
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                // 从 pipeline 移除 ChannelInitializer
                remove(ctx);
            }
            return true; // 初始化成功
        }
        return false; // 初始化失败
    }
    

    在initChannel实现中,添加ServerBootstrapAcceptor实例到pipeline中。

    ServerBootstrapAcceptor继承自ChannelInboundHandlerAdapter,负责把接收到的客户端socketChannel注册到childGroup中,由childGroup中的eventLoop负责数据处理。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    
            // 接受的客户端的 NioSocketChannel 对象
            final Channel child = (Channel) msg;
            // 添加 NioSocketChannel 的处理器
            child.pipeline().addLast(childHandler);
            // 设置 NioSocketChannel 的配置项
            setChannelOptions(child, childOptions, logger);
            // 设置 NioSocketChannel 的属性
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
    
            try {
                // 注册客户端的 NioSocketChannel 到 work EventLoop 中。
                childGroup.register(child).addListener(new ChannelFutureListener() {
    
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        // 注册失败,关闭客户端的 NioSocketChannel
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
    
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
    

    ChannelPipeline的分析就到这里了。

    相关文章

      网友评论

          本文标题:Netty源码分析(三) ChannelPipeline

          本文链接:https://www.haomeiwen.com/subject/lxqvxqtx.html