概述
前面两节分析了编码器,本节来看下netty的编码;以及netty是如何把JAVA对象转成字节流写出去的
示例代码
先来看一个简单的代码示例
服务端EchoServerHandler.channelRead构造对象返回
public class EchoServerHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
User user = new User("zzzliu", 18);
ctx.channel().writeAndFlush(user);
}
}
创建EchoServerEncodeHandler,编码格式为 length(4) + age(4) + age(可变长度)
/**
* +---+----+------+----+
* | 4 | 4 | ? |
* +---+----+------+----+
* | length | age | name |
* +---+----+------+----+
*/
public class EchoServerEncodeHandler extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) {
out.writeInt(4 + user.getName().getBytes().length);
out.writeInt(user.getAge());
out.writeBytes(user.getName().getBytes());
}
}
服务端启动加入handler
p.addLast(new EchoServerEncodeHandler(), new EchoServerHandler());
-
EchoServerHandler.channelRead
接收客户端数据不做处理直接构造一个User对象交给pipeline传递writeAndFlush事件; -
EchoServerEncodeHandler
继承MessageToByteEncoder<User>
在重写encode
里面实现编码逻辑 - EchoServerHandler中
ctx.channel().writeAndFlush(user);
开始写出逻辑
写出入口writeAndFlush
ctx.channel().writeAndFlush(user);
从pipeline尾部节点TailContext开始传播
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//通过MASK找到MASK_ONLY_OUTBOUND handler
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//flush默认true
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//相当于先调用write,再调用flush
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
总结一下就是先传播write事件(先ChannelOutboundBuffer的Entry链表上)紧接着再传播flush事件(再遍历链表调用jdkchannel写出到内核socket发送缓冲区中),下面分别对pipeline中重要的write和flush节点进行分析
MessageToByteEncoder.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 通过msg类型判断当前Handelr是否能处理写入的消息
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
//强转成MessageToByteEncoder泛型
I cast = (I) msg;
// 分配ButeBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用子类encode(具体的编码逻辑)
encode(ctx, cast, buf);
} finally {
// cast对象已经转换成ByteBuf了,释放掉
ReferenceCountUtil.release(cast);
}
// 如果buf中写入了数据,就把buf传到下一个节点
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 否则,释放buf,将空数据传到下一个节点
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
// 当buf在pipeline中处理完之后,释放
if (buf != null) {
buf.release();
}
}
}
-
acceptOutboundMessage(msg)
通过msg(MessageToByteEncoder的泛型)类型判断当前Handelr是否能处理写入的消息,不能处理的话直接传递到下一节点 -
allocateBuffer(ctx, cast, preferDirect);
分配ByteBuf(preferDirect默认为true即分配directBuf,否则分配heapBuf) - 调用子类encode(示例代码中
EchoServerEncodeHandler.encode
)执行具体编码逻辑,把Java对象按照自定义字节流写到ByteBuf中 - ByteBuf有数据则传递到下一节点处理,否则释放ByteBuf
-
buf.release();
最后不论是否执行成功都要释放ByteBuf - write事件最终传播到HeadContext节点
HeadContext.write
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
直接委托给unsafe处理AbstractUnsafe.write
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
//把非直接内存转换成直接内存DirectBuffer
msg = filterOutboundMessage(msg);
//估算出需要写入的ByteBuf的size
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//msg先插入outboundBuffer中队列
outboundBuffer.addMessage(msg, size, promise);
}
-
filterOutboundMessage
把非直接内存转换成直接内存DirectBuffer -
pipeline.estimatorHandle().size(msg);
估算出需要写入的ByteBuf的size -
addMessage
msg先插入outboundBuffer中队列
ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
//msg包装成Entry组成一个单向链表
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
//entry.pendingSize = size + 96
//统计当前有多少字节需要被写出
incrementPendingOutboundBytes(entry.pendingSize, false);
}
//第一个被写到操作系统Socket缓冲区中的节点
private Entry flushedEntry;
//第一个未被写入到操作系统Socket缓冲区中的节点
private Entry unflushedEntry;
//ChannelOutboundBuffer缓冲区的最后一个节点
private Entry tailEntry;
- msg包装成Entry组成一个单向链表,有多个Entry被添加之后链表flushedEntry = null; unflushedEntry到tailEntry之间为等待flush的msg
-
incrementPendingOutboundBytes
统计当前有多少字节需要被写出,如果已经超过阈值, 告诉调用者请不要再写入了,等flush后,会将阈值递减恢复,并发出可写事件
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 采用CAS更新newWriteBufferSize字段,
// 判断当前是否超过阈值,超过则触发ChannelWritabilityChanged事件。
//TOTAL_PENDING_SIZE_UPDATER:当前缓冲区有多少待写的字节
//channel.config().getWriteBufferHighWaterMark():buf字节容量最大值,默认64K
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
//设置不可写状态为不可写
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
//传播ChannelWritabilityChanged事件,表示buf已经写满,需要进行处理
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
累加待flush字节数(newWriteBufferSize),如果大于阈值(默认64K),则设置不可写状态为不可写并触发fireChannelWritabilityChanged
事件,表示buf已经写满,需要进行处理
HeadContext.flush
flush流程跟write类似,最终都是通过HeadContext进行处理;HeadContext委托到AbstractUnsafe.flush
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//调整unFushedEntry、fushedEntry指针,变成unFushedEntry = null,fushedEntry -> tailEntry中间节点表示可以flush
//说明当前outboundBuffer中所有Entry都可以flush
outboundBuffer.addFlush();
//进行flush
flush0();
}
protected void flush0() {
.........
doWrite(outboundBuffer);
.........
}
ChannelOutboundBuffer
public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
//计数并判断是否需要改变可写状态
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
//channel.config().getWriteBufferLowWaterMark():默认32K
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
//设置不可写状态为可写,并传播fireChannelWritabilityChanged事件
setWritable(invokeLater);
}
}
public boolean remove() {
//第一个flushedEntry节点
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
//把当前Entry移除,同时flush指针后移
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// 回收Entry
e.recycle();
return true;
}
AbstractNioByteChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//自旋锁次数,默认16次
int writeSpinCount = config().getWriteSpinCount();
do {
//msg:flushedEntry指针
Object msg = in.current();
if (msg == null) {
// 所有数据都flush完成,那么就将其写标志删除
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
//buf内没有数据,直接remove
if (!buf.isReadable()) {
in.remove();
return 0;
}
//将ByteBuf写出到jdk nio的Channel即通过jdk channel写到内核socket缓冲区,返回写入字节数
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
//buf数据写完了,从ChannelOutboundBuffer的Entry链表中删除
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
}
}
网友评论