每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。
ChannelOutboundBuffer 类属性
public final class ChannelOutboundBuffer {
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 8 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
private final Channel channel;
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
//
// 缓存链表中被刷新的第一个元素
private Entry flushedEntry;
// 缓存链表中中第一个未刷新的元素
private Entry unflushedEntry;
// 缓存链表中的尾元素
private Entry tailEntry;
// 刷新但还没有写入到 socket 中的数量
private int flushed;
从类的属性中可以看出 ChannelOutboundBuffer 定义了一个链表进行存储写入的数据。
1、ChannelOutboundBuffer 添加了N个 Entry
2、当 flush 后的效果
3、flush 后把所有的数据都写入 Socket 中
4、当再次写入 Entry 后
下面针对操作这些操作进行分析,添加、刷新、删除等操作。
addMessage() 添加操作
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
1、创建一个 新的Entry。
2、判断 tailEntry 是否为 null,如果为 null 说明链表为空。则把 flushedEntry 置为null。
3、如果 tailEntry 不为空,则把新添加的 Entry 添加到 tailEntry 后面 。
4、 将新添加的 Entry 设置为 链表的 tailEntry。
5、如果 unflushedEntry 为null,说明没有未被刷新的元素。新添加的Entry 肯定是未被刷新的,则把当前 Entry 设置为 unflushedEntry 。
6、统计未被刷新的元素的总大小。
addFlush() 刷新操作
当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将 unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,至于为什么这么做?
答:因为 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
1、通过 unflushedEntry 获取未被刷新元素 entry。
2、如果 entry 为null 说明没有待刷新的元素,不执行任何操作
3、如果 entry 不为 null,说明有需要被刷新的元素
4、如果 flushedEntry == null 说明当前没有正在刷新的任务,则把 entry 设置为 flushedEntry 刷新的起点。
5、循环设置 entry, 设置这些 entry 状态设置为非取消状态,如果设置失败,则把这些entry 节点取消并使 totalPendingSize 减去这个节点的字节大小。
在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。代码参见 AbstractUnsafe.flush() 方法。
remove() 删除操作
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return true;
}
1、获取 flushedEntry 节点,链表的头结点。如果获取不到清空 ByteBuf 缓存。
2、在链表上移除该 Entry。如果之前没有取消,只释放消息、通知和递减。
3、回收 Entry 对象。
removeEntry() 链表移除节点
private void removeEntry(Entry e) {
if (-- flushed == 0) {
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
1、如果 flushed ==0 说明,链表中所有 flush 的数据都已经发送到 Socket 中。把 flushedEntry 置位 null。此时链表可能还有 unflushedEntry 数据。
如果此时 e == tailEntry 说明链表为空,则把 tailEntry 和 unflushedEntry 都置为空。
flush==0,可能是这种状态,如下图:
2、把 flushedEntry 置为下一个节点(flushedEntry 此时是头结点)。
Entry 对象池使用
由于 Entry 使用比较频繁,会频繁的创建和销毁,这里使用了 Entry 的对象池,创建的时候从缓存中获取,销毁时回收。
下面看下 Entry 的创建过程
Entry 创建
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
.....
使用 Entry.newInstance 方法进行创建 Entry 对象。
static final class Entry {
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
};
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
Entry.newInstance() 方法收下从 Recycler 中获取 Entry 对象,如果获取不到则使用 Recycler.newObject() 方法创建一个 Entry 对象,调用 newObject() 方法在 Recycler.get() 方法中,代码如下:
public abstract class Recycler<T> {
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
从 Recycler 中可以看出,Entry 对象是存储在 Stack 中。如果 Stack 中没有可用的 Stack,则调用 newObject() 方法创建。
Entry 对象回收
public boolean remove() {
Entry e = flushedEntry;
...
e.recycle();
return true;
}
当 Entry 从链表中移除的时候回调用 e.recycle() 方法。
static final class Entry {
void recycle() {
next = null;
bufs = null;
buf = null;
msg = null;
promise = null;
progress = 0;
total = 0;
pendingSize = 0;
count = -1;
cancelled = false;
handle.recycle(this);
}
Entry.recycle() 方法会把 Entry 中成员变量全部初始化。然后在调用 handle.recycle() 方法。
public abstract class Recycler<T> {
static final class DefaultHandle<T> implements Handle<T> {
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
}
Recycler.recycle() 方法又把 Entry 对象压进 stack 中。
网友评论