美文网首页
(七)channelHandler的执行顺序以及原理

(七)channelHandler的执行顺序以及原理

作者: guessguess | 来源:发表于2021-04-27 16:47 被阅读0次

在前面讲了ChannelPipeline以及ChannlHandler以及ChannelHandlerContext的结构。
下面就来看看ChannelHandler的执行过程。以及Inbound,outBound对应的ChannelHandler的执行顺序。
下面先上demo

pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.gee</groupId>
  <artifactId>nio-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>


        <!-- 时间工具类 start -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
            <version>1.18.8</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 指定maven编译的jdk的版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

代码

ChannelHandler

其中ABC都是ChannelInboundHandler的子类。
DEF都是ChannelOutboundHandler的子类。

public class AChannelHandler extends ChannelInboundHandlerAdapter{
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("inbound A");
        ctx.fireChannelRegistered();
    }
}
public class BChannelHandler extends ChannelInboundHandlerAdapter{
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("inbound B");
        ctx.fireChannelRegistered();
    }
}
public class CChannelHandler extends ChannelInboundHandlerAdapter{
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("inbound C");
        ctx.fireChannelRegistered();
    }
}

public class DChannelHandler extends ChannelOutboundHandlerAdapter{
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("outbound D");
        ctx.connect(remoteAddress, localAddress, promise);
    }
}

public class EChannelHandler extends ChannelOutboundHandlerAdapter{
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("outbound E");
        ctx.connect(remoteAddress, localAddress, promise);
    }
}
public class FChannelHandler extends ChannelOutboundHandlerAdapter{
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("outbound F");
        ctx.connect(remoteAddress, localAddress, promise);
    }
}

服务端

public class Server {
    private static final int port = 9527;
    public static void main(String args[]) {
        start();
    }

    public static void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(boss, work);
        sb.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new AChannelHandler());
                        ch.pipeline().addLast(new BChannelHandler());
                        ch.pipeline().addLast(new CChannelHandler());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = sb.bind(port).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}

客户端

public class Client {
    private static final int port = 9527;
    private static final String host = "127.0.0.1";
    public static void main(String args[]) {
        connect();
    }

    public static void connect() {
        NioEventLoopGroup work = new NioEventLoopGroup();
        Bootstrap bs = new Bootstrap();
        bs.group(work);
        bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new AChannelHandler());
                        ch.pipeline().addLast(new BChannelHandler());
                        ch.pipeline().addLast(new CChannelHandler());
                        ch.pipeline().addLast(new DChannelHandler());
                        ch.pipeline().addLast(new EChannelHandler());
                        ch.pipeline().addLast(new FChannelHandler());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = bs.connect(host, port).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
    }
}

最后分别启动服务端以及客户端,观察运行结果。


client的运行结果

运行结果

从运行结果来看。
inBound的channelHandler 是顺序执行的。A->B->C
而outBound的channelHandler是逆序执行。F->E->D。
先记住,inbound是顺序,而outBound是逆序的。至于为什么后面再说。

为什么inBound是顺序的?

就还是从源码入手,一步一步看吧。
从我们的demo入手。
其实之前已经讲过channelHandler是如何被插入的ChannelHandlerContext的链中的,这里就不说了。
这里主要还是讲一下执行顺序。

这里还是从channel的connect开始。
channel在connect之前,当然是需要一系列的初始化,比如注册到对应的selector中,将这个channel对应的channelHandler都放到channlPipeline对应的链中。

代码入口。以channel的初始化为例子。

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            通过工厂返回channel实例。这里主要是会涉及到ChannelPipeline的实例化以及初始化。
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
              省略.....
        }
        channel初始化完,要做的事情当然就是注册到selector中了。
        所以直接看这里面的代码即可。往下面看
        ChannelFuture regFuture = config().group().register(channel);
        省略......
        return regFuture;
    }
}

channel注册到selector中后,会通过pipeline发起注册事件,用于去完善一些后续操作。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        register方法最后,定位到如下方法。
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    protected abstract class AbstractUnsafe implements Unsafe {
        private void register0(ChannelPromise promise) {
            try {
                //channel注册
                doRegister();
                channel注册完之后,有一些需要后续处理的事情。所以开始fire。。。发射。
                为什么要通过pipeline发射呢?待会就知道了。因为pipeline本身管理着ChannelHandlerContext的链,即channelHandler的链。
                pipeline.fireChannelRegistered();
                省略..........
            } catch (Throwable t) {
                省略..........
            }
        }
    }
}

为什么inBound是顺序?因为执行顺序是从head发起的。

因为channel中使用的channelPipeline默认的类就是DefaultChannelPipeline 
public class DefaultChannelPipeline implements ChannelPipeline {
    链的头部
    final AbstractChannelHandlerContext head;
    链的尾部
    final AbstractChannelHandlerContext tail;
    
    为什么是顺序执行,看到这里大家可能就懂了吧?
    因为各种fire方法。。。ChannelInBoundInvoker接口的实现,都是从头开始遍历的。
    所以,inbound的channelHandler的执行顺序必然是顺序的。
    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
}

那么如何执行呢?

从上面的代码片段,往里面走。

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    //这个context是head,从代码上来看,最重要的还是head实现的invokeChannelRegistered方法
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private boolean invokeHandler() {
        // Store in local variable to reduce volatile reads.
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

    private void invokeChannelRegistered() {
        head的状态是ADD_COMPLETE,所以必然if(true)
        if (invokeHandler()) {
            try {
                //直接往代码里面走,headContext返回自身。
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
        @Override
        public ChannelHandler handler() {
            return this;
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();
            //进行传播,head也是AbstractChannelHandlerContext的子类。方法就是在AbstractChannelHandlerContext实现的。
            ctx.fireChannelRegistered();
        }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        找到下一个为Inbound的ctx。
        再回到最初的方法。这里其实是一个递归操作。
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
   
    是不是又回到一开始的入口了?
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                拿到context自身对应的channelHandler,如果覆写了的话会执行覆写方法。
                如果没覆写的话,其实就是继续ctx.fireChannelRegistered.在父类中实现。
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
}

可能看着有点绕。那就画个图吧。


inbound的channelHandler的执行过程

其实下面部分是一个递归。
ChannelHandlerContext的一个抽象子类AbstractChannelContextHandler已经实现了,invokeChannelRegistered方法,说白了就是继续传播,传播到下一个ChannelHandlerContext,下一个ChannelHandlerContext可以去实现ChannelInBoundHandler里面的方法,决定要在这个过程中做什么,或者是要不要继续传播。如果不传播就停下来了。待会举一个例子。如果要继续传播,就需要ctx.firexxxxx。理论上来说,一直传播的话,会到达TailContext里面的方法,最后结束。因为TailContext是最后一个inbound属性的context.
下面看看TailContext中的代码。其实我们可以看到很多空方法,说白了就是执行结束,直接出栈。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }

        @Override
        public ChannelHandler handler() {
            return this;
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            // This may not be a configuration error and so don't log anything.
            // The event may be superfluous for the current pipeline configuration.
            ReferenceCountUtil.release(evt);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            onUnhandledInboundException(cause);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            onUnhandledInboundMessage(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
    }

如何使传播中断呢?

那就改一下什么的代码,比如将BChannelHandler的代码进行调整,其他不变。
将ctx.fireChannelRegistered注释。最后运行。

public class BChannelHandler extends ChannelInboundHandlerAdapter{
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("inbound B");
        //ctx.fireChannelRegistered();
    }
}

运行结果如下,之前是ABCFED,现在变成ABFED,所以我们可以通过决定要不要往下传播,从而去控制某个channelHandler是否要执行。


运行结果

最后来说说OutBound

为什么OutBound是逆序的呢?
答案估计大家都猜到了,从tail发起,一级一级的找到每个outBound属性的channelHandlerContext即可。
但是还是看看代码吧。
channel初始化完,注册完的操作,就是连接到服务端。

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelFuture connect(
            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    逐级往前查,把一个一个属性为outBound的ctx找出来。
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }


    @Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        找出下一个属性为outBound的ctx
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            执行,若没有实现的话,则通过父类继续往下传播。
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                 最后还是走到这步了。继续往下走。
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }
}

这个类是默认的时候,其实就是继续传播。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        继续往下走
        ctx.connect(remoteAddress, localAddress, promise);
    }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            继续往下走
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

    private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                这个方法走到最后,其实是HeadContext实现的。
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
}

如何终止传播呢?不要继续用ctx去主动操作即可。
大概就是这样子了。
总结一下,inbound的channelHandler为什么是顺序执行的,是因为从head发起的,然后逐级找到inbound为true的ctx。如果需要停止传播,在实现的ChannelInBoundHandler的方法里面去停止fire即可。
而outBound的channelHandler是逆序的,是因为从tail发起的,逐级找到outBound为true的ctx。如果需要传播,也是在实现的ChannelOutBoundHandler, 不再用ctx去执行outBound的方法即可。

相关文章

网友评论

      本文标题:(七)channelHandler的执行顺序以及原理

      本文链接:https://www.haomeiwen.com/subject/iszerltx.html