Unsafe
unsafe是不安全的意思,不要在应用程序里面直接使用Unsafe以及他的衍生类对象。Unsafe 在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关。
interface Unsafe {
/**
* Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
* receiving data.
*/
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* Return the {@link SocketAddress} to which is bound local or
* {@code null} if none.
*/
SocketAddress localAddress();
/**
* Return the {@link SocketAddress} to which is bound remote or
* {@code null} if none is bound yet.
*/
SocketAddress remoteAddress();
/**
* Register the {@link Channel} of the {@link ChannelPromise} and notify
* the {@link ChannelFuture} once the registration was complete.
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
* it once its done.
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
* If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
* pass {@code null} to it.
*
* The {@link ChannelPromise} will get notified once the connect operation was complete.
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void disconnect(ChannelPromise promise);
/**
* Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
* operation was complete.
*/
void close(ChannelPromise promise);
/**
* Closes the {@link Channel} immediately without firing any events. Probably only useful
* when registration attempt failed.
*/
void closeForcibly();
/**
* Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
* {@link ChannelPromise} once the operation was complete.
*/
void deregister(ChannelPromise promise);
/**
* Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
* {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
*/
void beginRead();
/**
* Schedules a write operation.
*/
void write(Object msg, ChannelPromise promise);
/**
* Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
*/
void flush();
/**
* Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
* It will never be notified of a success or error and so is only a placeholder for operations
* that take a {@link ChannelPromise} as argument but for which you not want to get notified.
*/
ChannelPromise voidPromise();
/**
* Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
*/
ChannelOutboundBuffer outboundBuffer();
Usafe接口继承及实现关系下类图:
image.pngNioUnsafe 在 Unsafe基础上增加了以下几个接口
public interface NioUnsafe extends Unsafe {
/**
* Return underlying 可以访问底层jdk
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
/**
* Read from underlying {@link SelectableChannel}
*/
void read();
void forceFlush();
}
- 从增加的接口以及类名上来看,NioUnsafe 增加了可以访问底层jdk的SelectableChannel的功能,定义了从SelectableChannel读取数据的read方法。AbstractUnsafe 实现了大部分Unsafe的功能。AbstractNioUnsafe 主要是通过代理到其外部类AbstractNioChannel拿到了与jdk nio相关的一些信息,比如SelectableChannel,SelectionKey等等。
- NioSocketChannelUnsafe 和 NioByteUnsafe 放到一起讲,其实现了IO的基本操作,读,和写,这些操作都与jdk底层相关
Unsafe的分类
继承结构来看,我们可以总结出两种类型的Unsafe分类,
- 与连接的字节数据读写相关的
NioByteUnsafe
- 与新连接建立操作相关的
NioMessageUnsafe
NioByteUnsafe中的读:委托到外部类NioSocketChannel
已知,boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,由worker线程来监听并处理read事件。当work线程的selector检测到OP_READ事件发生时,触发read操作。
//NioEventLoop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
unsafe.read()
直接调转到unsafe的接口NioByteUnsafe类中的read()方法,此方法的实现在类AbstractNioByteChannel中:
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
//负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator); // 申请一块指定大小的内存。
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf); // 触发事件,将会引发pipeline的读事件传播
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
(1) allocHandle//\负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少;
allocHandle的初始化类为AdaptiveRecvByteBufAllocator
;
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
static final int DEFAULT_MINIMUM = 64; //最小缓存(64),在SIZE_TABLE中对应的下标为3。
static final int DEFAULT_INITIAL = 1024; //初始化缓存大小,第一次分配缓存时,
//由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
static final int DEFAULT_MAXIMUM = 65536; //最大缓存(65536),在SIZE_TABLE中对应的下标为38。
private static final int INDEX_INCREMENT = 4; //上次预估缓存偏小,下次index的递增值。
private static final int INDEX_DECREMENT = 1; //上次预估缓存偏大,下次index的递减值。
private static final int[] SIZE_TABLE; //按照从小到大的顺序预先存储可以分配的缓存大小。
//从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
(2) allocHandle.allocate(allocator)
申请一块指定大小的内存。
//DefaultMaxBytesRecvByteBufAllocator.HandleImpl
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(nextReceiveBufferSize);
}
通过ByteBufAllocator的ioBuffer方法申请缓存,调用AbstractByteBufAllocator类中的方法ioBuffer:
@Override
public ByteBuf ioBuffer(int initialCapacity) {
if (PlatformDependent.hasUnsafe()) { //判断平台是否支持unsafe
return directBuffer(initialCapacity); //直接物理内存
}
return heapBuffer(initialCapacity); //堆上内存
}
根据平台是否支持unsafe,选择使用直接物理内存还是堆上内存。
directBuffer方案:
//类AbstractByteBufAllocator
@Override
public ByteBuf directBuffer(int initialCapacity) {
return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
newDirectBuffer
方法调用到UnpooledByteBufAllocator
类中:
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = PlatformDependent.useDirectBufferNoCleaner() ?
new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
Netty中使用引用计数机制来管理资源,ByteBuf实现了ReferenceCounted接口,当实例化一个ByteBuf时,引用计数为1, 代码中需要保持一个该对象的引用时需要调用retain方法将计数增1,对象使用完时调用release将计数减1。当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池。
(3) 方法doReadBytes(byteBuf)
将socketChannel数据写入缓存。
//NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
继续调用:
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);//读取src数据到this.ByteBuf
return this;
}
@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
//是否扩容处理
ensureWritable(length);
//调用子类实现
setBytes(writerIndex, src, srcIndex, length);
//记录已写长度
writerIndex += length;
return this;
}
总结:
1.writeBytes跟setBytes、readBytes跟getBytes区别是前者有记录,后者没有,而后者是子类的实现
2.扩容算法是两种策略:
大于4M时不走double自增,数值范围取 minNewCapacity <= maxCapacity
少于4M时从64开始double自增
3.更改容量也是每个子类实现,要考虑两种情况
大于当前容量
小于当前容量,当小于的时候要考虑 readerIndex、writerIndex边界,当超过 readerIndex、writerIndex边界heap的策略是丢去原来的数据
4.heap是继承 AbstractReferenceCountedByteBuf的,当refCnt记录为1时释放数据
=============================
ByteBuf
Nio ByteBuffer 和 Netty ByteBuf 对比
1 指针:
ByteBuffer
例如下面使用buffer的例子:
public class Test2 {
public static void main(String[] args) {
String content = "abcdefg";
ByteBuffer byteBuffer = ByteBuffer.allocate(256);
byteBuffer.put(content.getBytes());
byteBuffer.flip();
byte[] bufferValue = new byte[byteBuffer.remaining()];
byteBuffer.get(bufferValue);
System.out.println(new String(bufferValue));
}
}
ByteBuffer中会有三个下标,初始位置0,当前位置positon,limit位置,初始时,position为0,limit为Buffer数组末尾
调用buffer.put(value.getBytes())后:
image.png
不调用flip:
从缓冲区读取的是position — limit位置的数据,明显不是我们要的
调用flip:
会将limit设置为position,position设置为0,,此时读取的数据 :
image.png
比较关键的代码 byteBuffer.flip();它会把limit设置为position的位置。否则读取到的将会是错误的内容。
ByteBuf:
ByteBuf中使用两个指针,readerIndex,writerIndex来指示位置,初始时readrIndex = writerIndex = 0,当写入数据后:
image.png
writerIndex — capacity:可写容量
readerIndex — writerIndex:可读部分
当读取了M个字节后:
image.png
调用discardReadBytes,会释放掉discardReadBytes的空间,并把readableBytes复制到从0开始的位置,因此这里会发生内存复制,频繁调用会影响性能
image.png2 扩容
ByteBuffer
ByteBuffer缓冲区的长度固定,分多了会浪费内存,分少了存放大的数据时会索引越界,所以使用ByteBuffer时,为了解决这个问题,我们一般每次put操作时,都会对可用空间进行校检,如果剩余空间不足,需要重新创建一个新的ByteBuffer,然后将旧的ByteBuffer复制到新的ByteBuffer中去。
ByteBuf:
而ByteBuf则对其进行了改进,它会自动扩展,具体的做法是,写入数据时,会调用ensureWritable方法,传入我们需要写的字节长度,判断是否需要扩容,然后使用calculateNewCapacity进行扩容:
image.png image.png
网友评论