介绍
本编文章我们深入探究下使用netty通信的双方是如何写数据到网络
说明
我们基于NIO进行分析
写数据到网络
我先分析使用netty通信的一端是如何实现发送给另一端的,我假设一个场场景:自定义的一个channelHandler在channelActive方法被触发的时候通过channelContext向通信对端输出hello world,代码如下
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Unpooled.copiedBuffer 的作用就是把输入的字符串包装成ByteBuf
ctx.writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes()));
}
}
我看下ctx.writeAndFlush的调用链
writeAndFlush(msg) --> writeAndFlush(msg,promise) --> write(msg,flush,promise)
上面调用链上的方法都在ChannelHandlerContext中,我看下write方法的源代码
//在我们的例子中flush=true
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
//findContextOutbound方法的目的是在pipeline链上从本节点开始向head方向去找到executionMask符合MASK_WRITE | MASK_FLUSH的outBoundContext
//然后触发这个outBoundContext.invokeWriteAndFlush方法
//outBoundContext.invokeWriteAndFlush又会调用其绑定的outBoundHandler.write方法
//最后一个满足条件的outBoundContext是HeadText
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果提交写操作的线程和当前NioEventLoop绑定的线程不同,那么向NioEventLoop提交一个写数据的任务
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
如果不明白上面说的netty事件处理是怎么回事可以查看https://www.jianshu.com/p/36803adcbc02
如何不明白上面提到的netty线程的问题可以查看https://www.jianshu.com/p/732f9dea34d7
我们看下AbstractChannelHandlerContext.invokeWriteAndFlush的源代码
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//invokeWrite0会调用channelHandlerContext绑定的handler的write方法
invokeWrite0(msg, promise);
//invokeFlush0会调用channelHandlerContext绑定的handler的flush方法
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
HeadContext(本身实现了handler接口)是真正是实现把netty数据写出去的handler,我们分析HeadContext的write方法和flush方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
又看到了熟悉的unsafe了,我先分析unsafe.write的源代码
unsafe.write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//在数据被写出到网络上之前,所有待写出的数据都会存放在outboundBuffer中,这个类相当于发送缓存
//将来flush就是从outboundBuffer取出待写出数据写出到网络上
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
//filterOutboundMessage的作用判断输入的msg是不是ByteBuf或者FileRegion类型,如果不是那么就抛出异常
//如果msg是ByteBuf类型但是不是DirectByteBuf类型,那么还会把msg转化成DirectByteBuf类型
//这里面会涉及到netty的内存管理相关的知识,请看我写的另一篇文章[https://www.jianshu.com/p/550704d5a628](https://www.jianshu.com/p/550704d5a628)
msg = filterOutboundMessage(msg);
//计算msg的字节大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//把msg加入到outboundBuffer中
outboundBuffer.addMessage(msg, size, promise);
}
ChannelOutboundBuffer是用户写出数据的缓存类,数据在写出之前都会被加入到ChannelOutboundBuffer中,我再分析ChannelOutboundBuffer.addMessage方法之前,先看下ChannelOutboundBuffer的一些关键属性
public final class ChannelOutboundBuffer {
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 6 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
//ChannelOutboundBuffer对象占用的字节数(如果不明白学习下JOL)
static final int
CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() throws Exception {
return new ByteBuffer[1024];
}
};
//所属的SocketChannel
private final Channel channel;
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
// The Entry that is the first in the linked-list structure that was flushed
//将要被被flush的msg链表
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
//新添加的msg会被加入unflushedEntry链表
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
//指向最新加入缓存链的msg
private Entry tailEntry;
// The number of flushed entries that are not written yet
private int flushed;
private int nioBufferCount;
private long nioBufferSize;
private boolean inFail;
//目前缓存数据的总字节数
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
//当前缓存链表是不是不可写状态
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
看过了ChannelOutboundBuffer的属性,我们分析下ChannelOutboundBuffer.addMessage方法
addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
//把msg对象封装成缓存节点对象,注意这里的Entry是可回收对象
//关于netty的对象复用机制我在另一篇文章中有分析[https://www.jianshu.com/p/8f629e93dd8c](https://www.jianshu.com/p/8f629e93dd8c)
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
//如果tailEntry不为空,把msg的entry加入到缓存链表的末尾
Entry tail = tailEntry;
tail.next = entry;
}
//tailEntry指向最新加入的entry
tailEntry = entry;
if (unflushedEntry == null) {
//如果unflushedEntry为空,让unflushedEntry指向新加入的entry
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
//更新缓存中的数据总字节大小
incrementPendingOutboundBytes(entry.pendingSize, false);
}
上面就是unsafe.write的过程分析,我们看到unsafe.write就是把用户写出的数据放到了ChannelOutboundBuffer代表的缓存链表中,netty对写出缓存配置HighWaterMark和LowWaterMark两个表示容量界限的参数,当ChannelOutboundBuffer缓存的数据量大于HighWaterMark的时候,ChannelOutboundBuffer的unwritable会被设置成1,同事会触发fireChannelWritabilityChanged事件。incrementPendingOutboundBytes方法是实现这个判断逻辑
接下来我们分析缓存数据是如何被刷到网络上的
unsafe.flush
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
//addFlush的作用就是把unflushedEntry链表上的数据转移到flushedEntry链表上
outboundBuffer.addFlush();
//写数据到网络
flush0();
}
我们先分析outboundBuffer.addFlush()
outboundBuffer.addFlush()
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
//记录本次需要写出到网络上的msg总数
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
//清空unflushedEntry
unflushedEntry = null;
}
}
flush0
这个是真正写数据到网络的入口方法
protected void flush0() {
//inFlush0表示本channel是不是正在写数据到网络,如果是,那么就返回
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
//设置inFlush0=true表示本SocketChannel正在写数据到网络
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
//判断底层的SocketChannel是不是处于激活状态,如果没有激活那么需要设置本次刷新数据到网络的操作为失败状态
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
//把数据刷到网络的核心方法
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t), false);
}
}
} finally {
inFlush0 = false;
}
}
doWrite 源代码
doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
//获取java底层的SocketChannel
SocketChannel ch = javaChannel();
//一次最多可以执行多少次刷出数据到网络操作
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
//如果SocketChannel在selector上注册了OP_WRITE事件,那么写完数据之后,取消SocketChannel注册的OP_WRITE事件
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
//每一次写操作最多可以写多少数据
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
//初始化写操作ByteBuffer数组,in.nioBuffers方法我们在下面详解
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
//通过in.nioBuffer得到的ByteBuffer数组的长度
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
//ByteBuffer数组长度是0
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
//比如写出的数据是FileRegion类型而不是ByteBuffer类型
writeSpinCount -= doWrite0(in);
break;
//只有一个ByteBuffer的数据待刷到网络上
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
//取得待刷ByteBuffer
ByteBuffer buffer = nioBuffers[0];
//buffer需要刷到网络上的字节数
int attemptedBytes = buffer.remaining();
//java底层的SocketChannel写出数据到网络上,返回写出的数据量
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
//如果SocketChannel没把数据写出去,说socket tcp写缓存已经满了
//那么通过incompleteWrite方法使得SocketChannel向selector注册OP_WRITE,等待socket tcp 写缓存可用
incompleteWrite(true);
return;
}
//根据试图写入的数据量attemptedBytes,实际写入的数据量localWrittenBytes动态修改单次最大容许写入数据量maxBytesPerGatheringWrite
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
//根据实际的写入数据量更新ChannelOutboundBuffer的缓存状态
//如果localWrittenBytes等于attemptedBytes那么把对应的entry从缓存节点中删除
//如果localWrittenBytes不等于attemptedBytes说明本次写入没有把一个msg中包含的数据完整的写到网络上,
//那么需要更新msg对应的ByteBuf读写指针的位置,等待下次继续执行
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
//如果一次需要写出多个ByteBuffer
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
//算出多个待写出ByteBuffer的大小
long attemptedBytes = in.nioBufferSize();
//SocketChannel写出ByteBuffers到网络,返回写出的数据量
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
//同上
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
//同上
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
//同上
in.removeBytes(localWrittenBytes);
//writeSpinCount减去1
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
//如果writeSpinCount < 0,说明ChannelOutboundBuffer上还有一些缓存节点没有被刷到网络
incompleteWrite(writeSpinCount < 0);
}
我们分析下ChannelOutboundBuffer.nioBuffers,这个方法是用来把缓存链上的缓存节点转换成ByteBuffer,将来SocketChannel直接把ByteBuffer刷到网络上
nioBuffers
//maxCount用来设置本次缓存节点转化出的ByteBuffer数组的最大长度
//maxBytes用来设置转化出的ByteBuffer的最大容量,如果转化出的ByteBuffer的容量已经超过maxBytes,那么即使ChannelOutboundBuffer上还有缓存节点,在本次转化中都不再进行转化
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
long nioBufferSize = 0;
int nioBufferCount = 0;
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
//NIO_BUFFERS.get默认返回一个长度为1024的ByteBuffer数组
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
Entry entry = flushedEntry;
//判断entry是不是可以flush的,while循环会一直取缓存链上的缓存节点
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
if (!entry.cancelled) {
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
//计算当前缓存节点的大小
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes > 0) {
//在nioBufferCount!=0 的情况下,如果最大容许写出的数据量小于本次待转化为ByteBuffer的数据量加上已经转化成ByteBuffer的数据量,那么就不再继续做ByteBuffer转化了
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// we stop populate the ByteBuffer array. This is done for 2 reasons:
// 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// on the architecture and kernel but to be safe we also enforce the limit here.
// 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
//
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - http://linux.die.net/man/2/writev
break;
}
//更新已经转化成ByteBuffer的缓存节点的总字节大小
nioBufferSize += readableBytes;
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
// buf.nioBufferCount表示ByteBuf绑定的java ByteBuffer的个数,默认为1
entry.count = count = buf.nioBufferCount();
}
//nioBufferCount表示已经转化出的ByteBuffer的个数,因为在目前netty这个版本中nioBuffers.length的初始值是等于maxCount的,
//所以expandNioBufferArray没有机会得到执行
int neededSpace = min(maxCount, nioBufferCount + count);
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
//ByteBuf.internalNioBuffer就是返回ByteBuf绑定的ByteBuffer(position=readerIndex, limit = readerIndex+readableBytes)的副本
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
//设置nioBuffers[nioBufferCount]为当前转化出的ByteBuffer
//同时更新已经转化出的ByteBuffer的数量
nioBuffers[nioBufferCount++] = nioBuf;
} else {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
//如果一个Entry包含多个ByteBuffer那么通过nioBuffers转化
//但是这个地方在什么情况下会被调用我还没看到,而且没有看到在这种情况下如何处理entry的代码
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
//如果转化的ByeBuffer的个数已经大于等于了maxCount那么结束转化
if (nioBufferCount >= maxCount) {
break;
}
}
}
entry = entry.next;
}
//记录本次转化的ByteBuffer的个数
this.nioBufferCount = nioBufferCount;
//记录本次转化出的ByteBuffer的总大小
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
上面转化出的ByteBuffer[],会被SocketChannel写出到网络上。
我们再看下当一个缓存节点数据被写出到网络后,这个缓存节点会被如何处理
removeBytes
当缓存节点entry缓存的数据被全部或部分刷到网络上之后,entry节点在缓存链上的状态是怎么样的呢
public void removeBytes(long writtenBytes) {
for (;;) {
//获取当前缓存节点对应的msg
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
//获取缓存数据的对应的ByteBuf的readerIndex
final int readerIndex = buf.readerIndex();
//获取ByteBuf保存的缓存数据的大小
final int readableBytes = buf.writerIndex() - readerIndex;
//如果写出到网络上的数据量大于本节点的缓存数据大小
//那么意味着本缓存节点entry已经被全部刷到网络上,所以
//调用remove(),把entry从缓存链上删除
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
//如果用户设置了写入进度通知的功能,progress通知用户写入进度
progress(readableBytes);
//更新写入的数据量
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
//当前缓存节点的数据量大于writtenBytes
//那么需要更新entry对应ByteBuf的readIndex,这时缓存节点entry还是继续保留在缓存链上,等待下一次继续被写出
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
//最后清空转化的ByteBuffer数组
clearNioBuffers();
}
entry节点从缓存链上删除之后,这个时候netty更新totalPendingSize = totalPendingSize - entry缓存大小,更新之后如果totalPendingSize < LowWaterMake会更新ChannelOutboundBuffer.unwritable =0表示当前ChannelOutboundBuffer为可写状态,同时触发fireChannelWritabilityChanged事件
最后我们看一下incompleteWrite方法
incompleteWrite
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
//setOpWrite用来表示是不是需要在selector上注册OP_WRITE事件
//当一次写的过程中没有把ChannelOutboundBuffer中缓存的数据全部刷到网络上,说明底层的socket写缓存已经满了,
//这时候需要向selector注册OP_WRITE事件,等socket写缓存有空间了,将来在继续刷数据到网络
if (setOpWrite) {
//setOpWrite()就是SocketChannel向selector注册OP_WRITE事件
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
//如果数据全部刷完了,那么SocketChannel需要清空OP_WRITE
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
eventLoop().execute(flushTask);
}
}
分析完成
网友评论