美文网首页
(八)netty是如何进行写数据的

(八)netty是如何进行写数据的

作者: guessguess | 来源:发表于2021-05-20 10:53 被阅读0次

    我们在使用Netty的时候,对于写数据,其实很简单的,只需要实现ChannelOutboundHandler接口就可以了。
    随后将这个handler加入到channel的channelPipeline中即可,就可以完成自定义的数据写入。

    public class WriteHandler implements ChannelOutboundHandler{
    
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            // TODO Auto-generated method stub
            
        }
        ....还有若干其他需要覆写的方法
    }
    

    那么netty是如何完成这个操作的?
    这里我们还是先看看channel是如何去写入的,先看看channel的结构。
    这里可以看出,channel接口本身继承了ChannelOutboundInvoker这个接口。

    Channel

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    }
    

    ChannelOutboundInvoker

    public interface ChannelOutboundInvoker {
        ChannelFuture write(Object msg, ChannelPromise promise);
        ChannelFuture writeAndFlush(Object msg);
    }
    

    从上面单纯的看结构,其实什么都看不出来,然后有一个很重要的点。channel的公共实现都是在AbstractChannel中。
    所以下面看看AbstractChannel中的代码

    AbstractChannel

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        @Override
        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            return pipeline.writeAndFlush(msg, promise);
        }
    }
    

    从这里一看,是通过channel的pipeline去写入。由于channel的pipeline的类型是DefaultChannelPipeline。
    所以下面看看这个类中的实现

    DefaultChannelPipeline

    public class DefaultChannelPipeline implements ChannelPipeline {
        @Override
        public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            return tail.writeAndFlush(msg, promise);
        }
    }
    

    从这里看出,是从channel的pipeline中的尾巴开始进行写入。由于tail并没有去实现writeAndFlush方法,所以这个方法的实现是在父类。下面要说到一个比较重要的类AbstractChannelHandlerContext。当我们的channel
    从上面的代码也可以看出,outbound其实都是从tail进行传播的,所以outbound事件都是逆序的。

    AbstractChannelHandlerContext

    abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
        volatile AbstractChannelHandlerContext next;
        volatile AbstractChannelHandlerContext prev;
    
        @Override
        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            write(msg, true, promise);
            return promise;
        }
        
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            ......................................省略无关紧要的代码。
            //找到下一个属性为outbound的ctx
            final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                    (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    //去执行
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    //去执行
                    next.invokeWrite(m, promise);
                }
            } else {
                final AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                if (!safeExecute(executor, task, promise, m)) {
                    task.cancel();
                }
            }
        }
    
    
        。。。从执行方法开始,最后定位到此处。
        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    }
    

    从上面的代码看出,其实像写入这个操作,一开始从channel发起。

    1.从channel的Pipeline的tail发起。

    2.由于tail自身并没有覆写write方法,所以是父类,AbstractChannelHandlerContext中实现的。(这里其实主要是起到一个传播的作为,找到tail的下一个属性为outbound的ctx)。

    3.找到ctx之后,将ctx转化为channelHandler,再去调用write方法。

    一般channelHandler的write方法,其实也是利用ctx.write,说白了就是,这个channelHandler不对写操作做什么处理,只是简单做一个传递的操作,就又回到第二步。
    举个例子,像这个channelHandler只是做了传播的作用。另外也可以去做编码。

    传播
    public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
        @Skip
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ctx.write(msg, promise);
        }
    }
    
    编码
    public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
         自定义的编码方式,其实说白了就是将自己编码后的数据,写入到bytebuf
        protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            ByteBuf buf = null;
            try {
                if (acceptOutboundMessage(msg)) {
                    @SuppressWarnings("unchecked")
                    I cast = (I) msg;
                    buf = allocateBuffer(ctx, cast, preferDirect);
                    try {
                        进行编码,将编码后的数据写入到buf中
                        encode(ctx, cast, buf);
                    } finally {
                        ReferenceCountUtil.release(cast);
                    }
    
                    if (buf.isReadable()) {
                        因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                        ctx.write(buf, promise);
                    } else {
                        buf.release();
                       因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                        ctx.write(Unpooled.EMPTY_BUFFER, promise);
                    }
                    buf = null;
                } else {
                    因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                    ctx.write(msg, promise);
                }
            } catch (EncoderException e) {
                throw e;
            } catch (Throwable e) {
                throw new EncoderException(e);
            } finally {
                if (buf != null) {
                    buf.release();
                }
            }
        }
    }
    
    4.写入

    通过2,3步骤的反复操作。最后来到headContext。
    可以看看headContext的实现。所以我们可以看到,最后将数据写入到内存的,是headContext。
    到这里整个写入的过程就完成了。

    public class DefaultChannelPipeline implements ChannelPipeline {
        final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:(八)netty是如何进行写数据的

          本文链接:https://www.haomeiwen.com/subject/nhwpjltx.html