美文网首页
Netty源码4--pipline

Netty源码4--pipline

作者: xian_cheng | 来源:发表于2018-11-05 15:44 被阅读0次

上篇介绍accept连接的过程中已经介绍服务端在收到一个客户端请求后,会将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听,由于Netty在接收到消息的处理流程大概如下:


netty时序图.png

(1)客户端发送消息到服务端。
(2)服务端监听对应channel上的io事件,监听到读事件后读取消息。
(3)将读取到的字节数据放入ByteBuf中,然后调用pipline的fireChannelRead方法,将这个ByteBuf中保存的数据传递给pipline,pipline中保存了一个链表,这个链表中每个节点里都存储一个channelHandler,在channelHandler中可以对ByteBuf中的数据进行校验、解码等操作。

这是一个非常粗的消息交互过程,主要是先清楚一下消息的大概处理流程,从上面的三个步骤来看,pipline是一个非常重要的对象,如果想要搞清楚服务器消息处理的流程,有必要研究下这个对象的源码实现。对于pipline这个对象,我想要搞清楚的主要是下面三个问题:
(1)channelHandler是怎么绑定到pipline中的?
(2) pipline的数据结构是怎样的?
(3)消息是怎么由一个channelHandler传递到另一个channelHandler的?

首先分析第一个问题,为了便于理解,先给出demo程序。

    public void start() {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(port))
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new InBoundHandler1());
                        ch.pipeline().addLast(new InBoundHandler2());
                        ch.pipeline().addLast(new InBoundHandler3());
                    }
                });
       ....
    }

在上面server的启动程序中,.childHandler的作用就是将用户自定义的channelHandler添加到pipeline中。我们主要看下是在什么时候用户自定义的channelHandler添加到pipeline中的。
.childHandler的作用主要是初始化了ServerBootstrap这个对象的childHandler对象,将这个对象的引用指向ChannelInitializer的实现类。看下这个childHandler对象在哪使用用的,通过追溯代码发现是在ServerBootstrap的init方法中使用的。

    @Override
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        final ChannelHandler currentChildHandler = childHandler;
        ....
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                ....
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

从上面代码中可以看到,在new ServerBootstrapAcceptor时将这个childHandler对象传递给了ServerBootstrapAcceptor类中的childHandler成员变量。继续看下ServerBootstrapAcceptor类中的childHandler在哪使用的,发现是在ServerBootstrapAcceptor类中的channelRead方法中使用的。

       @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            ......
        }

前面的文章已经介绍这个方法是在接收到一个新连接时,在这个方法中将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听。本文关心的是pipline和channelHandler的关系,因此我们主要看下child.pipeline().addLast(childHandler)这句代码,跟下这句代码的实现。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
        ....
    }

具体实现在DefaultChannelPipeline的addLast方法中,主要看下面这两句代码:

      newCtx = newContext(group, filterName(name, handler), handler);
      addLast0(newCtx);

首先创建一个DefaultChannelHandlerContext对象,childHandler的引用传给了这个对象,那么这个对象就持有了对childHandler的引用,然后调用addLast0方法。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

从这个方法就可以很清楚的看到DefaultChannelPipeline保存了一个双向链表的数据结构,用于记录channelHandler的处理顺序。下图说明了DefaultChannelPipeline初始化时的数据结构。


DefaultChannelPipeline初始化链表结构.png

从上图可以看出,DefaultChannelPipeline初始化时有一个head节点和tail节点,每个节点对应的都是一个AbstractChannelHandlerContext类,如果对双向链表很清楚的话,AbstractChannelHandlerContext类中应该会有两个指针,一个指向前一个节点,一个指向后一个节点。
上图是初始化时的数据结构,那么执行addLast0方法之后的数据结构如下图所示:


DefaultChannelPipeline add last.png

这里为了直观一些,插入的节点用了channelHandler表示,其实这个节点也是一个AbstractChannelHandlerContext对象,这个对象持有了对childHandler的引用。我们知道这个childHandler指向的只是在demo程序中实现ChannelInitializer类initChannel方法的一个实现类,demo程序中InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler还没有添加到这个链表里,那这几个handler是在哪添加的呢,我们继续回到addLast这个方法的实现。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
        ....
    }

这个方法首先将childHandler放入了一个新建的AbstractChannelHandlerContext对象,然后将这个AbstractChannelHandlerContext对象又放入了DefaultChannelPipeline的链表中。
然后根据registered这个布尔变量,判断是否进入if条件,registered这个布尔变量表示的是这个DefaultChannelPipeline对应的channel是否已经注册到NioEventLoopGroup,由于接收到一个新连接调用channelRead方法时先将childHandler放入DefaultChannelPipeline,然后才将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,所以此时registered布尔变量为false,进入if条件。进入if条件后,看下 callHandlerCallbackLater这个方法。

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;

        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;
        } else {
            // Find the tail of the linked-list.
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

这个方法中将刚才新建的AbstractChannelHandlerContext对象放入了PendingHandlerAddedTask这个线程类中,看下这个线程类。

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                executor, ctx.name(), e);
                    }
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }
    }

这个线程类中一个实现了Runnable的run方法,一个实现了execute方法,这两个方法中调用了callHandlerAdded0这个方法,我们继续跟下这个方法的实现。

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.setAddComplete();
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            ...
            } catch (Throwable t2) {
            ....
    }

这个方法中调用了 ctx.handler().handlerAdded(ctx)方法,其中 ctx.handler()返回的是实现ChannelInitializer类initChannel方法的一个实现类,说明这里调用的是ChannelInitializer类中的handlerAdded方法,进入handlerAdded方法看下具体实现。

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {          
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

在handlerAdded方法中调用initChannel(ChannelHandlerContext ctx)方法,我们在这个方法中发现调用了initChannel((C) ctx.channel())这个方法,这个方法是个抽象方法,具体实现就是在demo程序中。我们在demo中这个方法的具体实现如下:

.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new InBoundHandler1());
                        ch.pipeline().addLast(new InBoundHandler2());
                        ch.pipeline().addLast(new InBoundHandler3());
                    }
                });

通过调用ch.pipeline().addLast这个方法将InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler添加DefaultChannelPipeline的链表中,这步操作完成后,DefaultChannelPipeline的链表结构如下所示:


pipline init.png

我们继续看下initChannel(ChannelHandlerContext ctx)方法,我们发现在将这三个handler放入链表中,最后调用了 remove(ctx)方法,这个方法是将实现ChannelInitializer类initChannel方法的一个实现类对应的AbstractChannelHandlerContext从链表中删除,这样操作后,DefaultChannelPipeline的链表结构就变成了下面这样。


pipline finish.png

经过上面的分析,我们就把前面两个问题给解决了,明白了channelHandler是怎么绑定到pipline中的以及pipline的数据结构是怎样的。

第三个问题关于消息是怎么由一个channelHandler传递到另一个channelHandler的,放到下一篇服务端消息处理来具体讲。

相关文章

网友评论

      本文标题:Netty源码4--pipline

      本文链接:https://www.haomeiwen.com/subject/cvcfxqtx.html