美文网首页
(八)netty是如何进行写数据的

(八)netty是如何进行写数据的

作者: guessguess | 来源:发表于2021-05-20 10:53 被阅读0次

我们在使用Netty的时候,对于写数据,其实很简单的,只需要实现ChannelOutboundHandler接口就可以了。
随后将这个handler加入到channel的channelPipeline中即可,就可以完成自定义的数据写入。

public class WriteHandler implements ChannelOutboundHandler{


    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // TODO Auto-generated method stub
        
    }
    ....还有若干其他需要覆写的方法
}

那么netty是如何完成这个操作的?
这里我们还是先看看channel是如何去写入的,先看看channel的结构。
这里可以看出,channel接口本身继承了ChannelOutboundInvoker这个接口。

Channel

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
}

ChannelOutboundInvoker

public interface ChannelOutboundInvoker {
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
}

从上面单纯的看结构,其实什么都看不出来,然后有一个很重要的点。channel的公共实现都是在AbstractChannel中。
所以下面看看AbstractChannel中的代码

AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return pipeline.writeAndFlush(msg, promise);
    }
}

从这里一看,是通过channel的pipeline去写入。由于channel的pipeline的类型是DefaultChannelPipeline。
所以下面看看这个类中的实现

DefaultChannelPipeline

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return tail.writeAndFlush(msg, promise);
    }
}

从这里看出,是从channel的pipeline中的尾巴开始进行写入。由于tail并没有去实现writeAndFlush方法,所以这个方法的实现是在父类。下面要说到一个比较重要的类AbstractChannelHandlerContext。当我们的channel
从上面的代码也可以看出,outbound其实都是从tail进行传播的,所以outbound事件都是逆序的。

AbstractChannelHandlerContext

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);
        return promise;
    }
    
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        ......................................省略无关紧要的代码。
        //找到下一个属性为outbound的ctx
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                //去执行
                next.invokeWriteAndFlush(m, promise);
            } else {
                //去执行
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                task.cancel();
            }
        }
    }


    。。。从执行方法开始,最后定位到此处。
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
}

从上面的代码看出,其实像写入这个操作,一开始从channel发起。

1.从channel的Pipeline的tail发起。

2.由于tail自身并没有覆写write方法,所以是父类,AbstractChannelHandlerContext中实现的。(这里其实主要是起到一个传播的作为,找到tail的下一个属性为outbound的ctx)。

3.找到ctx之后,将ctx转化为channelHandler,再去调用write方法。

一般channelHandler的write方法,其实也是利用ctx.write,说白了就是,这个channelHandler不对写操作做什么处理,只是简单做一个传递的操作,就又回到第二步。
举个例子,像这个channelHandler只是做了传播的作用。另外也可以去做编码。

传播
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }
}
编码
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
     自定义的编码方式,其实说白了就是将自己编码后的数据,写入到bytebuf
    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    进行编码,将编码后的数据写入到buf中
                    encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {
                    因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                   因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
}
4.写入

通过2,3步骤的反复操作。最后来到headContext。
可以看看headContext的实现。所以我们可以看到,最后将数据写入到内存的,是headContext。
到这里整个写入的过程就完成了。

public class DefaultChannelPipeline implements ChannelPipeline {
    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }
    }
}

相关文章

网友评论

      本文标题:(八)netty是如何进行写数据的

      本文链接:https://www.haomeiwen.com/subject/nhwpjltx.html