美文网首页
Netty源码分析之pipeline

Netty源码分析之pipeline

作者: xiehongm_信陵 | 来源:发表于2018-11-06 14:41 被阅读0次
承接题意,平铺直叙。Netty中每个channel都有一个pipeline,可以看下channel的层次结构:

其实在之前的客户端和服务端初始化的时候已经说过了,在初始化Channel的时候,同时初始化pipeline;

//AbstractChannel
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

从DefaultChannelPipeline即可知道初始化pipeline的时候,head和tail是双向链表。可以看下它们的继承关系,再进行其初始化的观察。

HeadContext:
TailContext:

从继承结构可以看出,HeadContext和TailContext都继承了AbstractChannelHandlerContext和实现了ChannelOutboundHandler/ChannelInboundHandler,说明用户双重属性,既是context同时也是handler,按我的理解意味着其即拥有上下文属性也拥有handler属性(处理业务逻辑)。在文章开始的图里已经说明:每个channel包含一个pipeline,而pipeline又维护了一个双向链表。

TailContext和HeadContext


这是TailContext和HeadContext构造函数,需要注意的是TailContext的inbound为true,outbound为false,HeadContext则相反,这两个参数和netty事件流向有关,具体情况下文说明。

重新分析ChannelInitializer和自定义handler的添加

Bootstrap.connect()-->Bootstrap.doResolveAndConnect()-->AbstractBootstrap.initAndRegister()-->Bootstrap.init()

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());
    。。。
}

config.handler()获取的就是ChannelInitializer,p.addLast(config.handler());就是把ChannelInitializer加入双向链表,看代码:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

让我们来看看其中的关键代码

newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

由上可知,在加入ChannelInitializer的过程中可以知道,为了添加一个 handler 到pipeline中, 会把此handler包装成ChannelHandlerContext。同时addLast0说明ChannelInitializer是添加在tail之前。这个过程中注意下两个有意思的方法:

private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

从源码中可以看到, 当一个handler实现了ChannelInboundHandler接口, 则 isInbound 返回真; 类似地, 当一个handler实现了ChannelOutboundHandler接口, 则isOutbound就返回真。ChannelInitializer是实现了ChannelInboundHandlerAdapter,所以inbound传入的是true。

自定义handler的添加

addLast方法中的另一条关键代码如下:

callHandlerAdded0(newCtx);

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    。。。    
    ctx.handler().handlerAdded(ctx);
    。。。
}

ctx.handler()取到的自然是ChannelInitializer,而handlerAdded(ctx)都做了什么呢:

handlerAdded()-->boolean initChannel()-->void initChannel()

public void  handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

ChannelInitializer在加入双向链表后,调用重写initChannel()方法,在initChannel()方法中加入自定义handler,最后remove(ctx);移除ChannelInitializer。
回过头来看下channel的结构层次图,在初始化channel的时候会构建一个pipeline座位channel的属性(pipeline也有一个channel属性),每个pipeline维护了一个由ChannelHandlerContext 组成的双向链表. 这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。


pipeline的传输机制

从上面的分析,我们知道AbstractChannelHandlerContext中有inbound和outbound两个boolean变量, 分别用于标识Context所对应的handler的类型, 即:

  • inbound为真时, 表示对应的ChannelHandler实现了ChannelInboundHandler方法.
  • outbound 为真时, 表示对应的 ChannelHandler 实现了 ChannelOutboundHandler 方法.
    pipieline的事件传输类型有两种:inbound事件和outbound事件两种:
                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

这个是netty官方文档,可以很明显地看出:inbound事件和outbound事件的流向相反。 inbound 的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的传递方式是通过调用 ChannelHandlerContext.OUT_EVT() 方法。例如ChannelHandlerContext.fireChannelRegistered()调用会发送一个ChannelRegistered 的inbound给下一个ChannelHandlerContext, 而ChannelHandlerContext.bind调用会发送一个bind的outbound事件给下一个 ChannelHandlerContext。
让我们来看下inbound事件传播的方法有哪些:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead()
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught()
ChannelHandlerContext.fireUserEventTriggered()
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

outbound事件传播的方法有:

ChannelHandlerContext.bind()
ChannelHandlerContext.connect()
ChannelHandlerContext.write()
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect()
ChannelHandlerContext.close()

让我们具体来看看这两类事件

outbound事件

outbound事件是请求事件,inbound事件是通知事件,这个要区分清楚。请求事件就是请求某件事即将发生,然后outbound事件进行通知。outbound事件的流向是:

tail -> customContext -> head

让我们用connect事件代码来证明:
当调用Bootstrap.connect()的时候,会触发一个outbound事件。以下是调用链

Bootstrap.connect -> Bootstrap.doConnect -> AbstractChannel.connect
让我们看看AbstractChannel.connect

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}
//pipeline.connect的实现如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

当 outbound 事件(这里是 connect 事件)传递到 Pipeline 后, 它其实是以 tail 为起点开始传播的。而 tail.connect 其实调用的是AbstractChannelHandlerContext.connect 方法。继续跟进,在AbstractChannelHandlerContext中connect方法:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    。。。
    final AbstractChannelHandlerContext next = findContextOutbound()
    next.invokeConnect(remoteAddress, localAddress, promise);
    。。。
}

让我们看下其中的关键代码:

private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

顾名思义,findContextOutbound就是找出以this context(tail)为基本节点,找出第一个outbound为true的context,然后通过ctx调用invokeConnect方法,如果

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

从tail往head方向获取handler并且调用其connect,如果用户没有从写这个方法,那么会调用ChannelOutboundHandlerAdapter实现的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

connect又会调用AbstractChannelHandlerContext中connect方法找到下一个outbound为true的handler调用其connect,这样的循环中,直到connect事件传递到DefaultChannelPipeline的双向链表的头节点, 即 head 中(head的outbound设置为true)。

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

outbound事件传到head后,因为head本身也是handler,handler()返回的的就它本身,让我们看看它connect方法的实现:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}
到这边outbound事件就结束了。

inbound事件

inbound是通知事件,就是说某件事情已经发生了,然后利用inbound事件进行通知。inbound事件的传输方向和outbound刚好相反:

head -> customcontext -> tail

沿着connect继续走,在之后会有inbound事件,我们就以这个为例子进行inbound事件讲解。
承接上文,之前看到head的connect方法:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

这里unsafe.connect调用的是AbstractNioChannel.connect(),关键代码如下:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ···
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } 
        ···
}

在doConnect完成连接之后调用了fulfillConnectPromise,

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        return;
    }
    boolean active = isActive();
    boolean promiseSet = promise.trySuccess();
    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }
    if (!promiseSet) {
        close(voidPromise());
    }
}

让我们看pipeline().fireChannelActive();pipeline().fireChannelActive()将通道激活的消息(即 Socket 连接成功)发送出去。这里就是inbound事件的起点,往下走看这个过程是怎么样的:

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

很明显,以head(HeadContext)为起点,让我们看下在invokeChannelActive做了什么

static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelInactive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelInactive();
            }
        });
    }
}
//next.invokeChannelInactive()实现
private void invokeChannelInactive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelInactive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelInactive();
    }
}

这个方法和 Outbound 的对应方法(例如 invokeConnect) 如出一辙. 同Outbound一样, 如果用户没有重写channelActive方法, 那么会调用ChannelInboundHandlerAdapter 的 channelActive 方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}

和outbound事件一样,一样的循环,最后事件传输到tail。tail 本身既实现了ChannelInboundHandler接口, 又实现了ChannelHandlerContext接口,因此当channelActive消息传递到tail后,会将消息转递到对应的ChannelHandler中处理,tail的handler()返回的就是tail本身,最后的channelActive即是tail中的。
inbound事件到这里也就结束了。

相关文章

网友评论

      本文标题:Netty源码分析之pipeline

      本文链接:https://www.haomeiwen.com/subject/oiucxqtx.html