美文网首页
(五)自定义的handler是如何被添加的

(五)自定义的handler是如何被添加的

作者: guessguess | 来源:发表于2021-04-22 20:41 被阅读0次

在使用netty的时候,由于经常要自定义channelHander,去处理我们自己的业务,或者是使用特定的解码器,编码器。
demo代码如下,其中TimeClientHandler是自定义的一个channelHandler

public class TimeClient {
    public static void main(String args[]) {
        connect();
    }
    
    private static void connect() {        
        //用于客户端处通道的读写
        EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
                .handler(new TimeClientHandler());
        ChannelFuture cf = null;
        try {
            //一直阻塞,直到连接上服务端
            cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
            //一直阻塞,直到该通道关闭
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //避免线程没有杀死
            work.shutdownGracefully();
        }
    }
}

TimeClientHandler代码如下

public class TimeClientHandler extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    }
}

那么channel是如何维护自己的channelHandler链呢?
首先有一个比较重要的概念。ChannelPipeline。每个channel都会有一个自己的channelPipeline。
在讲ChannelPipeline之前,先了解一下ChannelPipeline的结构。

ChannelPipeline是ChannelInboundInvoker与ChannelOutboundInvoker的子类。
结构比较简单,就不画图了。
ChannelInboundInvoker与ChannelOutboundInvoker其实都是接口,没有任何实现。

ChannelInboundInvoker

channelInBoundInvoker这个接口,有点像回调,都是channel完成各种类的操作之后,去调用对应的方法。
这个fire有点像发事件类似。
public interface ChannelInboundInvoker {
    ChannelInboundInvoker fireChannelRegistered();
    ChannelInboundInvoker fireChannelUnregistered();
    ChannelInboundInvoker fireChannelActive();
    ChannelInboundInvoker fireChannelInactive();
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    ChannelInboundInvoker fireUserEventTriggered(Object event);
    ChannelInboundInvoker fireChannelRead(Object msg);
    ChannelInboundInvoker fireChannelReadComplete();
    ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker

ChannelOutboundInvoker是主动发起的,比如channel的的connect,disconnect等等。
public interface ChannelOutboundInvoker {
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();
}

ChannelPipeline

现在结合一下上述接口,说明了ChannelPipeline具备的基本功能,那就是即可以主动去做一些事情(如连接),也可以被动的去做一些事情(如作为回调,比如fireChannelReadComplete,读取完成之后去做些什么)。
另外ChannelPipeline提供的一些方法, 可以对ChannelHandler进行增删改查。另外对接口的一些方法进行了覆写,返回类型改为ChannelPipeline.

    ChannelPipeline addFirst(String name, ChannelHandler handler);
    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
    ChannelPipeline addFirst(ChannelHandler... handlers);
    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
    ChannelPipeline addLast(ChannelHandler... handlers);
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler remove(String name);
    <T extends ChannelHandler> T remove(Class<T> handlerType);
    ChannelHandler removeFirst();
    ChannelHandler removeLast();
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);
    ChannelHandler first();
    ChannelHandlerContext firstContext();
    ChannelHandler last();
    ChannelHandlerContext lastContext();
    ChannelHandler get(String name);
    ChannelHandlerContext context(ChannelHandler handler);
    ChannelHandlerContext context(String name);
    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
    Channel channel();
    List<String> names();
    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

     @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();

介绍完以上的概念。
下面先说一下,ChannelPipeline是如何初始化的。
这里又不得不回到Channel的初始化。

ChannelPipeline初始化

channelPipeline的初始化代码如下,channel进行实例化的时候,会顺便将channelPipeline实例化

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        channel的实例化以及初始化
        pipeline = newChannelPipeline();
    }

    protected DefaultChannelPipeline newChannelPipeline() {
         //由此看来channelPipeline的类型是DefaultChannelPipeline
        // 下面看看DefaultChannelPipeline的构造方法
        return new DefaultChannelPipeline(this);
    }
}

channelPipeline的实例化

public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        //创建tail head的容器,是一个双向链表。AbstractChannelHandlerContext 是前面提到的ChannelInboundInvoker以及ChannelOutboundInvoker的子类
        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            setAddComplete();
        }
    }

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
    }
}

tail以及head的实例化, 从代码中是可以看出tail是属于InBound类型,而head是属于OutBound类型

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
}

ChannelPipeline初始化完之后,接下来就是看如何将ChannelHandlerContext放到channelPipeline中

如何将ChannelHandlerContext放到channelPipeline中

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            实例化通道
            channel = channelFactory.newChannel();
             初始化通道,在初始化通道的时候,会构建一个新的ChannelHandlerContext,并且加入到channel对应的ChannelPipeline中。具体还是看看Init方法内部,是在BootStrap中实现的
             ===========================
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        省略
    }
}
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    @Override
    @SuppressWarnings("unchecked")
    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        //这个handler其实就是我们自己定义的handler。具体看DefalutChannelPipeline中的实现。
        p.addLast(config.handler());
        省略.....
}

DefalutChannelPipeline中addLast的实现。
public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查是否已经加过了
            checkMultiplicity(handler);
           //生成新的channelHandlerContext.这个group传进来的是空,而channelPipeline跟channel有关系,EventExecutorGroup 就会使用channel所绑定的EventLoop
            newCtx = newContext(group, filterName(name, handler), handler);
            ============================这个方法需要重点看看,其实说白了就是将这个ChannelHandlerContext插入到Head以及tail中间。代码在下面。
            addLast0(newCtx);
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            下面说白了就是将该ChannelHandlerContext的handlerState改为ADD_COMPLETE
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }


    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
}

通过以上,就将新生成的ChannelHandlerContext插入到ChannelPipeline中去了。

如何将ChannelHandler放入ChannelPipeline中

其实是在初始化完channel,后面进行注册的时候。
具体的代码位置。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    private final DefaultChannelPipeline pipeline;
    protected abstract class AbstractUnsafe implements Unsafe {
        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //通道进行注册。
                doRegister();
                neverRegistered = false;
                registered = true;
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);

                通道对应的pipeline进行注册事件的发起。....................................................这里是需要重点看的。
                因为pipeline对应的实现类是DefaultChannelPipeline。所以直接进入方法内部查看
                pipeline.fireChannelRegistered();

                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        //因为一开始注册就是在异步中发起的,所以必然在eventLoop的线程中。
        if (executor.inEventLoop()) {
            所以逻辑会走到这里。是一个private方法。
            具体往里面走============================================
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered() {
        这个方法是true,因为传进来的head,本身的handlerState == ADD_COMPLETE,所以会返回true
        if (invokeHandler()) {
            try {
                head实现的方法就是返回自身。而channelRegistered这个方法,也被覆写了。
                接下来看看headContext对于channelRegistered的实现。=====================
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
}

public class DefaultChannelPipeline implements ChannelPipeline {
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            invokeHandlerAddedIfNeeded();

            fireChannelRegister方法是在AbstractChannelHandlerContext中实现的
            ctx.fireChannelRegistered();
        }
    }
}

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
     //因为一开始传进来是head,说白了就是找到第一个为InBound的handlerContext
     //因为一开始就是next,所以这个东西,就是我们上面生成的ctx(ChannelHandlerContext),巧了一开始我们定义的channelHander恰好是ChannelInboundHandler的子类,且生成ctx的时候,inbound属性为true。
     private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

   @Override
    public ChannelHandlerContext fireChannelRegistered() {
        //这个方法的返回值就是前面生成的ctx。往下走.....就是下面这个方法了。
        invokeChannelRegistered(findContextInbound());
        return this;
    }

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                handler方法的实现在DefalutChannelHandlerContext。就是返回ChannelHandler.其实就是我们自己定义的ChannelHandler. 我们自己定义的ChannelHandler就是TimeClientHandler。
                channelRegistered这个方法是在ChannelInitializer中实现的。
                因此看具体实现。
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
}

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        这个方法就会调用,我们去实现的initChannel方法了。
        if (initChannel(ctx)) {
            再发起channelRegister.因为可能我们不止添加一个ChannelInitializer。还有剩余的ChannelHandler需要被加进去。
            ctx.pipeline().fireChannelRegistered();
        } else {
            ctx.fireChannelRegistered();
        }
    }

    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        //判断是否初始化过,initMap是线程安全的map。相当于一个锁的操作。
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
            try {
                再往里面走。这里就是我们自己实现的方法了。
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                最后把生成的ctx移除。从链表中移除
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        assert ctx != head && ctx != tail;
        synchronized (this) {
            remove0(ctx);
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
}

public class TimeClientHandler extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        重复上述的过程。。。
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
        ch.pipeline().addLast(new TimeClientChannelHandlerAdapter());
    }
}

添加的ChannelHandler是如何传播的呢?

public class TimeClientHandler extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        重复上述的过程。。。
        ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
    }
}


public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        这个方法就会调用,我们去实现的initChannel方法了。
        if (initChannel(ctx)) {
            再发起channelRegister.因为可能我们不止添加一个ChannelInitializer(实现这个接口可以自行添加ChannelHandler)。还有剩余的ChannelHandler需要被加进去。
            原先一开始是 head -> ctx(包含了我们自定义的TimeClientHandler ) -> tail
            后面变成head ->LineBasedFrameDecoder->StringDecoder->tail
            所以再次fire的时候,当然是会把新加进去的channelHandler的channelRegistered方法执行一遍。
            其实像LineBasedFrameDecoder,StringDecoder的channelRegistered都是用父类的实现,所以都是简单的做一个传播,并没有太复杂的实现。===========================代码如下
            ctx.pipeline().fireChannelRegistered();
        } else {
            ctx.fireChannelRegistered();
        }
    }
}

因为我们自定义添加的LineBasedFrameDecoder,StringDecoder都是ChannelInboundHandlerAdapter 的子类
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        //再次传播
        ctx.fireChannelRegistered();
    }
}

至此,就完了~也说明了一个东西。InBound的channelHandler是从前到后去执行的。
总结一下。
1.在channel实例化的时候,先实例化ChannelPipeline.在此过程中会实例化head和tail,这俩个channelHandlerContext的链。
2.随后,在channel初始化完,就要进行注册。
3.注册的过程中,当channel注册到对应的selector中后,就要将对应的ChannelHandler都注册到ctx(channelHandlerContext)链中。
4.首先会从链的头部,head开始,找到第一个为Inbound的ctx,逐级传播。如果在中间遇到实现了ChannelInitializer的子类,可能会往中间插入ctx,再把旧的ctx从链中移除,然后再次逐级传播(避免有ctx没有被插入链中),然后再次从head逐级传播。当然很多channelHandler的注册方法,其实只是为了传播,没有对channelPipeline中的ctx链做任何操作。

相关文章

网友评论

      本文标题:(五)自定义的handler是如何被添加的

      本文链接:https://www.haomeiwen.com/subject/pidzlltx.html