我们在使用Netty的时候,对于写数据,其实很简单的,只需要实现ChannelOutboundHandler接口就可以了。
随后将这个handler加入到channel的channelPipeline中即可,就可以完成自定义的数据写入。
public class WriteHandler implements ChannelOutboundHandler{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// TODO Auto-generated method stub
}
....还有若干其他需要覆写的方法
}
那么netty是如何完成这个操作的?
这里我们还是先看看channel是如何去写入的,先看看channel的结构。
这里可以看出,channel接口本身继承了ChannelOutboundInvoker这个接口。
Channel
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
}
ChannelOutboundInvoker
public interface ChannelOutboundInvoker {
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
}
从上面单纯的看结构,其实什么都看不出来,然后有一个很重要的点。channel的公共实现都是在AbstractChannel中。
所以下面看看AbstractChannel中的代码
AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}
}
从这里一看,是通过channel的pipeline去写入。由于channel的pipeline的类型是DefaultChannelPipeline。
所以下面看看这个类中的实现
DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
}
从这里看出,是从channel的pipeline中的尾巴开始进行写入。由于tail并没有去实现writeAndFlush方法,所以这个方法的实现是在父类。下面要说到一个比较重要的类AbstractChannelHandlerContext。当我们的channel
从上面的代码也可以看出,outbound其实都是从tail进行传播的,所以outbound事件都是逆序的。
AbstractChannelHandlerContext
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
......................................省略无关紧要的代码。
//找到下一个属性为outbound的ctx
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
//去执行
next.invokeWriteAndFlush(m, promise);
} else {
//去执行
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}
。。。从执行方法开始,最后定位到此处。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
}
从上面的代码看出,其实像写入这个操作,一开始从channel发起。
1.从channel的Pipeline的tail发起。
2.由于tail自身并没有覆写write方法,所以是父类,AbstractChannelHandlerContext中实现的。(这里其实主要是起到一个传播的作为,找到tail的下一个属性为outbound的ctx)。
3.找到ctx之后,将ctx转化为channelHandler,再去调用write方法。
一般channelHandler的write方法,其实也是利用ctx.write,说白了就是,这个channelHandler不对写操作做什么处理,只是简单做一个传递的操作,就又回到第二步。
举个例子,像这个channelHandler只是做了传播的作用。另外也可以去做编码。
传播
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
}
编码
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
自定义的编码方式,其实说白了就是将自己编码后的数据,写入到bytebuf
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
进行编码,将编码后的数据写入到buf中
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
ctx.write(buf, promise);
} else {
buf.release();
因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
因为此ctx类型为abstractChannelHandlerContext。并没有覆写ctx的write方法,所以只是起传播作用
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
}
4.写入
通过2,3步骤的反复操作。最后来到headContext。
可以看看headContext的实现。所以我们可以看到,最后将数据写入到内存的,是headContext。
到这里整个写入的过程就完成了。
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
}
}
网友评论