Netty如何发送消息(writeAndFlush)
作者:
简书徐小耳 | 来源:发表于
2019-01-18 23:56 被阅读78次
- 1.主要就是调用writeAndFlush,注意这边可以调用ChannelHandlerContext的,也可以调用channel的
- 2.上述2种方式的主要区别就是前者是从当前handler流转到head节点,而后者则送从tail节点到head
- 3.首先还得提前申明下该方法是异步,即我们方法执行结束的时候消息很有可能还在我们的buffer里面 未通过socket发送到对端,接下来我们详细剖析该方法。
- 4.首先调用该方法需要2个参数,一个是object类型的消息,还有一个ChannelPromise。如果后者我们不设置,则 默认设置一个DefaultChannelPromise,内部持有当前的channel和loop。
- 5.然后检测我们的promise,为空抛出异常,是否已经done,如果已经done但是未取消则抛出异常,如果已经done且取消了则直接返回true,再继续检测promise携带的channel是否与当前的channel一样,不一样也抛出异常。还要检测promise的class类型是否等于DefaultChannelPromise,如果是则直接返回,否则需要进一步检测primose是否为VoidChannelPromise或者CloseFuture,如果是可能需要抛出异常。
- 6.当上述检测到无效的promise的时候会直接返回,并且如果我们的message是ReferenceCounted类型的则调用其release方法
- ReferenceCounted是netty自己维护的一个计数方式,用来管理对象的生命周期
- 8.从当前的context寻找其前面的ChannelHandlerContext且outbound=true,outbound=true代表这个context里面的handler是ChannelOutboundHandler
- 9.里面涉及到我们 ReferenceCounted 的touch方法,其实只是简单方便我们deubugger,如果我们byteBuf存在泄漏我们可以debugger观察
- 10.netty一般使用ResourceLeakDetector来检测bytebuf的是否泄漏(所谓的泄漏就是我们不使用该对象,但是其refCnt不为0,从而导致该bytebuf不能释放)
- 11.如果我们想使用ResourceLeakDetector来检测,我们必须使用ByteBufAllocator来获取bytebuf,而且目前只支持堆外内存(池化和非池化)和堆内池化,但是堆内非池化不支持检测。
- 12.ResourceLeakDetector检测的原理是我们每次使用ByteBufAllocator创建bytebuf会根据我们的泄漏检测级别来进行。如果是DISABLED则直接返回,如果是小于PARANOID级别的则我们使用随机数来抽样代表每次创建是否需要reportLeak,
- 13.最后我们需要创建一个DefaultResourceLeak对象,这个对象会存放在ConcurrentMap,key就是我们的DefaultResourceLeak 其是虚引用(我们知道我们的虚引用是指当引用对象被gc,则该虚引用则会被加入一个队列对象
而我们所谓的泄漏是指我们的bytebuf对象被回收了但是其内部资源对象没有回收,导致即无法使用该内存且该内存也不会被回收进而导致内存泄漏。内部对象存储数据一般是数组或者directBytebuffer),而value是LeakEntry
其只是简单的作为一个value,同时我们还获取我们的trackedHash=我们bytebuf的hash值。
- 14.我们的reportLeak主要就是检测我们的logger是否可用,如果不可用我们的logger那就无法打印泄漏信息,那么我们需要清空我们的队列,如果logger可用则将虚引用取出来把该bytbuf的基本信息打印出来
- 15.上面说完了内存泄漏的问题原理,我们这边继续探讨如何发送消息,我们在获取到ChannelOutboundHandler后调用其invokeWriteAndFlush方法
- 16.上面方法先调用invokeWrite0方法然后再调用invokeFlush0。
- 17.invokeWrite0方法最终调用ChannelOutboundHandler的write方法,在这边我们还有一个需要注意我们传递的消息可以是object对象,但是其底层还是directBytebuffer,如果我们刚开始不是传递这个消息对象,那么需要自己编写一个handler进行编码操作这个我们会在后续再细说。
- 18.最终调用到了我们的headContext的write也就是AbstractUnsafe的write方法,而invokeFlush0最终调用的是AbstractUnsafe的flush方法
- 19.write方法是首先获取当前channel里面的ChannelOutboundBuffer,然后会判断当前要传递的对象是否堆外内存或者FileRegion则可以直接发送,如果是bytebuf但是是非堆外内存则包装成堆外内存,如果以上都不是则直接抛出异常。
- 20.然后我们评估下我们的要发送的内容大小,包装成Entry对象。评估大小的方式:如果是ByteBuf或者ByteBufHolder,则 直接获取发送字节的大小,如果是fileReginon则直接返回0,其他返回unknownSize(默认是8)
- 21.我们的entry会最终存放到entry链表,有三个链表对象:tailEntry,flushedEntry,unflushedEntry.
- 22.我们刚开始会把第一个entry设置为tailEntry和unflushedEntry,当我们执行addflush方法就是将unflushedEntry的链表的节点,转移到flushedEntry链表下,即flushedEntry链表保存的是即将要发送的entry节点,而我们每次调用addMessge都是将entry对象添加到当前tailEntry的下一个。
- 23.flush的时候会观察是否存在FlushPending,即看当前的selectionKey是否有效以及当前的channel是否注册了write事件。当key有效且注册了write事件则代表当前的channel已经写满了需要等channel有空闲,我们需要等待我们write事件触发了再真正调用flush(写事件的就绪的本质是只要channel不为空闲就会触发写事件)
- 24.首先需要判断inFlush0,如果为true则代表当前flush方法正在调用则直接返回,然后判断调用NioSocketChannel的doWrite方法
- 25.该write方法首先获取writeSpinCount,即单个消息最多循环发送的次数,然后将我们的bytebuf组装ByteBuffer的数组,目前默认最大的是1024,数组的所有数据最大值不大于maxBytesPerGatheringWrite,当然不一定不超过,只是尽量不超过。然后每次发送的时候检测发送的字节大小如果等于0代表channel满了,满了就注册写事件等到channel有buffer可写数据即可
- 26.如果成功一次性写完则尝试取消写事件并设置一个直接调用写事件的任务这样就算我们在此期间没有任何事件发生,我们还可以继续去将channelOutboundbuffer里面的entry写出去了。
- 27至此整个写事件都已经描述完成了
本文标题:Netty如何发送消息(writeAndFlush)
本文链接:https://www.haomeiwen.com/subject/kccodqtx.html
网友评论