美文网首页
2019-05-19 pipeline 初始化、新增、删除操作

2019-05-19 pipeline 初始化、新增、删除操作

作者: Terminalist | 来源:发表于2019-05-19 23:25 被阅读0次
    • 1.pipeline的初始化

    之前我们分析过,每构造一个channel的时候会通过newChannelPipeline初始化一个pipeline;

     protected AbstractChannel(Channel parent, ChannelId id) {
            this.parent = parent;
            this.id = id;
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    

    newChannelPipeline的实现逻辑,this 是当前的channel

    protected DefaultChannelPipeline newChannelPipeline() {
           return new DefaultChannelPipeline(this);
       }
    
    protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise = new VoidChannelPromise(channel, true);
    
            //创建tail和head节点
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            //构造一个双向链表的数据结构,包含head和tail两个节点,链表的元素其实是ChannelHandlerContext
            head.next = tail;
            tail.prev = head;
        }
    

    总结下,pipeline的创建是在创建channel的时候就创建了。

      1. ChannelHandlerContext 解析
        首先看下ChannelHandlerContext的类继承关系
        public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker

    包含三层含义:

    • extends AttributeMap : 自身可以存储一些属性;
    • extends ChannelInboundInvoker:可以触发一些用户事件,包括读事件,注册事件等;
    • extends ChannelOutboundInvoker: 可以触发一些用户事件,包括写事件
        Channel channel();
        EventExecutor executor();
        String name();
        ChannelHandler handler();
        boolean isRemoved();
        ChannelPipeline pipeline();
        ByteBufAllocator alloc();
    
    • ChannelHandlerContext本身包含获取当前的channel,获取当前的NioEventloop,当前属于哪个ChannelHandler等等;

    • 3.HeadContext 和 TailContext

    • 3.1 TailContext继承AbstractChannelHandlerContext

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
           TailContext(DefaultChannelPipeline pipeline) {
               super(pipeline, null, TAIL_NAME, true, false);
               setAddComplete();
           }
    }
    
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
            //当前context的名字  
            this.name = ObjectUtil.checkNotNull(name, "name");
           //  当前context所属的pipeline
            this.pipeline = pipeline;
           //  当前context所属的NioEventLoop
            this.executor = executor;
           //  标示是inboundHandler还是outboundHandler
            this.inbound = inbound;
            this.outbound = outbound;
            // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
            ordered = executor == null || executor instanceof OrderedEventExecutor;
        }
    

    可以看出TailContext是一个inBound处理器,用于处理读事件,注册事件;

    通过cas+自悬操作将当前节点设置为已经添加

    final void setAddComplete() {
            for (; ; ) {
                int oldState = handlerState;
                if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                    return;
                }
            }
        }
    
    • 3.2 HeadContext

    构造函数比TailContext多了一个unsafe属性,其余的都相同

    final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            //unsafe 属性实现底层数据的读写
            private final Unsafe unsafe;
    
            HeadContext(DefaultChannelPipeline pipeline) {
                super(pipeline, null, HEAD_NAME, false, true);
                unsafe = pipeline.channel().unsafe();
                setAddComplete();
            }
    }
    

    基本实现了父类的方法,包含读写,注册,异常传播等;

    • 4.ChannelHandler的添加与删除
    • 4.1 添加channelhandler
      在业务代码中我们一般添加handler都是通过这样的方式进行添加
      ch.pipeline().addLast(new EchoServerHandler());
    

    接下来我们看下addLast方法中都做了哪些操作?

    public final ChannelPipeline addLast(ChannelHandler... handlers) {
         return addLast(null, handlers);
    }
    
     @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
            if (handlers == null) {
                throw new NullPointerException("handlers");
            }
            //一个一个添加
            for (ChannelHandler h : handlers) {
                if (h == null) {
                    break;
                }
                addLast(executor, null, h);
            }
            return this;
        }
    
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                //1.判断是否重复添加
                checkMultiplicity(handler);
    
                //2.构造一个HandlerContext,如果有同名则抛异常
                newCtx = newContext(group, filterName(name, handler), handler);
    
                //3.添加HandlerContext
                addLast0(newCtx);
    
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                //4.如果当前线程是EventLoop,则异步触发HandlerAdded0事件,否则直接触发
                if (!executor.inEventLoop()) {
                    newCtx.setAddPending();
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerAdded0(newCtx);
                        }
                    });
                    return this;
                }
            }
            callHandlerAdded0(newCtx);
            return this;
        }
    
    private static void checkMultiplicity(ChannelHandler handler) {
           //判断是不是ChannelHandlerAdapter的实例
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
                //如果当前handler不是用Sharable注解的并且已经添加了,则直接抛异常
                if (!h.isSharable() && h.added) {
                    throw new ChannelPipelineException(
                            h.getClass().getName() +
                                    " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                //否则,标示已经添加
                h.added = true;
            }
        }
    

    构造一个DefaultChannelHandlerContext对象

     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
            return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
        }
    
    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
            super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
        }
    

    执行添加操作,就是往链表中插入一个元素

       private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    添加完成,触发该handler的一个handlerAdded事件,并设置当前handler已经添加

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
    }
    
    • 4.2 删除channelHandler
        public final ChannelPipeline remove(ChannelHandler handler) {
            remove(getContextOrDie(handler));
            return this;
        }
    

    拿到channelHandler节点

    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
            AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
            if (ctx == null) {
                throw new NoSuchElementException(handler.getClass().getName());
            } else {
                return ctx;
            }
        }
    
    public final ChannelHandlerContext context(ChannelHandler handler) {
            if (handler == null) {
                throw new NullPointerException("handler");
            }
    
            //从头开始遍历节点,无限for循环,如果遍历到则返回,否则返回null
            AbstractChannelHandlerContext ctx = head.next;
            for (; ; ) {
                if (ctx == null) {
                    return null;
                }
                if (ctx.handler() == handler) {
                    return ctx;
                }
                ctx = ctx.next;
            }
        }
    

    移除节点,触发HandlerRemoved事件

    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
          //当前节点不是头节点和尾节点,因为要保证线程安全,必须保证pipeline的结构
            assert ctx != head && ctx != tail;
    
            synchronized (this) {
                remove0(ctx);
                if (!registered) {
                    callHandlerCallbackLater(ctx, false);
                    return ctx;
                }
    
                EventExecutor executor = ctx.executor();
                if (!executor.inEventLoop()) {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            callHandlerRemoved0(ctx);
                        }
                    });
                    return ctx;
                }
            }
            callHandlerRemoved0(ctx);
            return ctx;
        }
    

    移除节点的操作

    private static void remove0(AbstractChannelHandlerContext ctx) {
            AbstractChannelHandlerContext prev = ctx.prev;
            AbstractChannelHandlerContext next = ctx.next;
            prev.next = next;
            next.prev = prev;
        }
    

    调用对应Handler的remove方法,最后标示该handler已经remove,设置remove的标示

    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
            try {
                try {
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    ctx.setRemoved();
                }
            } catch (Throwable t) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
            }
        }
    
    final void setRemoved() {
            handlerState = REMOVE_COMPLETE;
    }
    

    pipeline的操作就讲到这里了。

    相关文章

      网友评论

          本文标题:2019-05-19 pipeline 初始化、新增、删除操作

          本文链接:https://www.haomeiwen.com/subject/qweczqtx.html