美文网首页
2019-05-19 pipeline 初始化、新增、删除操作

2019-05-19 pipeline 初始化、新增、删除操作

作者: Terminalist | 来源:发表于2019-05-19 23:25 被阅读0次
  • 1.pipeline的初始化

之前我们分析过,每构造一个channel的时候会通过newChannelPipeline初始化一个pipeline;

 protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

newChannelPipeline的实现逻辑,this 是当前的channel

protected DefaultChannelPipeline newChannelPipeline() {
       return new DefaultChannelPipeline(this);
   }
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise = new VoidChannelPromise(channel, true);

        //创建tail和head节点
        tail = new TailContext(this);
        head = new HeadContext(this);

        //构造一个双向链表的数据结构,包含head和tail两个节点,链表的元素其实是ChannelHandlerContext
        head.next = tail;
        tail.prev = head;
    }

总结下,pipeline的创建是在创建channel的时候就创建了。

    1. ChannelHandlerContext 解析
      首先看下ChannelHandlerContext的类继承关系
      public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker

包含三层含义:

  • extends AttributeMap : 自身可以存储一些属性;
  • extends ChannelInboundInvoker:可以触发一些用户事件,包括读事件,注册事件等;
  • extends ChannelOutboundInvoker: 可以触发一些用户事件,包括写事件
    Channel channel();
    EventExecutor executor();
    String name();
    ChannelHandler handler();
    boolean isRemoved();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
  • ChannelHandlerContext本身包含获取当前的channel,获取当前的NioEventloop,当前属于哪个ChannelHandler等等;

  • 3.HeadContext 和 TailContext

  • 3.1 TailContext继承AbstractChannelHandlerContext

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
       TailContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, TAIL_NAME, true, false);
           setAddComplete();
       }
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
        //当前context的名字  
        this.name = ObjectUtil.checkNotNull(name, "name");
       //  当前context所属的pipeline
        this.pipeline = pipeline;
       //  当前context所属的NioEventLoop
        this.executor = executor;
       //  标示是inboundHandler还是outboundHandler
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

可以看出TailContext是一个inBound处理器,用于处理读事件,注册事件;

通过cas+自悬操作将当前节点设置为已经添加

final void setAddComplete() {
        for (; ; ) {
            int oldState = handlerState;
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return;
            }
        }
    }
  • 3.2 HeadContext

构造函数比TailContext多了一个unsafe属性,其余的都相同

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        //unsafe 属性实现底层数据的读写
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
}

基本实现了父类的方法,包含读写,注册,异常传播等;

  • 4.ChannelHandler的添加与删除
  • 4.1 添加channelhandler
    在业务代码中我们一般添加handler都是通过这样的方式进行添加
  ch.pipeline().addLast(new EchoServerHandler());

接下来我们看下addLast方法中都做了哪些操作?

public final ChannelPipeline addLast(ChannelHandler... handlers) {
     return addLast(null, handlers);
}
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        //一个一个添加
        for (ChannelHandler h : handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1.判断是否重复添加
            checkMultiplicity(handler);

            //2.构造一个HandlerContext,如果有同名则抛异常
            newCtx = newContext(group, filterName(name, handler), handler);

            //3.添加HandlerContext
            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            //4.如果当前线程是EventLoop,则异步触发HandlerAdded0事件,否则直接触发
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
private static void checkMultiplicity(ChannelHandler handler) {
       //判断是不是ChannelHandlerAdapter的实例
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //如果当前handler不是用Sharable注解的并且已经添加了,则直接抛异常
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                                " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            //否则,标示已经添加
            h.added = true;
        }
    }

构造一个DefaultChannelHandlerContext对象

 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

执行添加操作,就是往链表中插入一个元素

   private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

添加完成,触发该handler的一个handlerAdded事件,并设置当前handler已经添加

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
}
  • 4.2 删除channelHandler
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

拿到channelHandler节点

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }

        //从头开始遍历节点,无限for循环,如果遍历到则返回,否则返回null
        AbstractChannelHandlerContext ctx = head.next;
        for (; ; ) {
            if (ctx == null) {
                return null;
            }
            if (ctx.handler() == handler) {
                return ctx;
            }
            ctx = ctx.next;
        }
    }

移除节点,触发HandlerRemoved事件

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
      //当前节点不是头节点和尾节点,因为要保证线程安全,必须保证pipeline的结构
        assert ctx != head && ctx != tail;

        synchronized (this) {
            remove0(ctx);
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

移除节点的操作

private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

调用对应Handler的remove方法,最后标示该handler已经remove,设置remove的标示

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        try {
            try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
}

pipeline的操作就讲到这里了。

相关文章

  • 2019-05-19 pipeline 初始化、新增、删除操作

    1.pipeline的初始化 之前我们分析过,每构造一个channel的时候会通过newChannelPipeli...

  • ArryList源码

    ArryList适合查询多 不适合大量新增与删除的操作 属性 初始化 ArryList就是操作数组 add(Obj...

  • Redis 如何高效安全删除大 Hash Key

    使用 SCAN 和 Pipeline 命令删除 Redis 的大 Key 删除操作会导致 Redis 线程阻塞,网...

  • 2022-02-11 git配置

    1.git高层命令 安装 初始化配置 初始化仓库 C(新增) U(修改) D(删除 & 重命名) R(查询) 分支...

  • 5-5 DOM结构操作

    5-5 DOM结构操作(新增删除移动,获取父子元素) 新增节点 获取父元素 获取子元素 删除节点 移动 其他比如遍...

  • EasyUi之datagrid常见使用

    要点: 1、初始化2、加载数据3、处理数据(选择、新增、删除、清空) 一、初始化 二、加载数据 三、处理数据(选择...

  • netty系列之(五)——ChannelPipeline与Cha

    一、pipeline初始化 Pipeline在创建Channel的时候被创建调用newChannelPipelin...

  • es 批量删除

    es.bulk 可用于批量删除 更新 新建操作 根据id批量删除示例代码: 批量新增: 批量更新

  • redis内容扩展:一文看懂

    redis内容扩展 Pipeline 注意:使用Pipeline的操作是非原子操作 GEO GEOADD loca...

  • git

    初始化操作 版本回退 checkout 撤销文件内容 删除文件 分支

网友评论

      本文标题:2019-05-19 pipeline 初始化、新增、删除操作

      本文链接:https://www.haomeiwen.com/subject/qweczqtx.html