美文网首页
Netty的高低水位

Netty的高低水位

作者: 赵信信官属 | 来源:发表于2020-11-04 00:32 被阅读0次

假如我们的底层使用Netty作为网络通信框架,业务流程在将业务数据发送到对端之前,实际先要将数据发送到Netty的缓冲区中,然后再从Netty的缓冲区发送到TCP的缓冲区,最后再到对端.

图片.png

业务数据不可能无限制向Netty缓冲区写入数据,TCP缓冲区也不可能无限制写入数据.Netty通过高低水位控制向Netty缓冲区写入数据的多少.

它的大体流程就是向Netty缓冲区写入数据的时候,会判断写入的数据总量是否超过了设置的高水位值,如果超过了就设置通道(Channel)不可写状态.
当Netty缓冲区中的数据写入到TCP缓冲区之后,Netty缓冲区的数据量变少,当低于低水位值的时候,就设置通过(Channel)可写状态.

介绍代码

// 代码位置: io.netty.channel.DefaultChannelPipeline.HeadContext#write​
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}
// 代码位置: io.netty.channel.AbstractChannel.AbstractUnsafe#write
@Override
public final void write(Object msg, ChannelPromise promise) {
    
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    // 数据​写入到Netty缓冲区
    outboundBuffer.addMessage(msg, size, promise);
}

// 代码位置: io.netty.channel.ChannelOutboundBuffer#addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
        tailEntry = entry;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
​
    // 高水位判断​
    incrementPendingOutboundBytes(entry.pendingSize, false);
}​
// 代码位置: io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long, boolean)
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }
​
​    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // 如果Netty缓冲区的数据总量已经超过高水位,则设置不可写状态​
    ​if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}
// 默认的高低水位值
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

以上就是写入数据时候的简单流程,以及通过高水位值控制不可写状态.
当调用刷新flush方法时,通过低水位值控制可写状态,此处就不贴代码了,关于flush流程之前的文章有讲过.

之后我们就可以判断通过(Channel)是否可写

// 代码位置: io.netty.channel.AbstractChannel#isWritable​
​@Override
public boolean isWritable() {
    ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    return buf != null && buf.isWritable();
}

虽然Netty提供了这个一个高低水位的机制,控制向Netty缓冲区写入数据.但是我们可以忽略它,依然可以向Netty缓冲区写入数据,但是遗憾的是,TCP缓冲区大小是固定的,总会达到某个时间点,Netty不能向TCP缓冲区写入数据了.这个时候我们就需要注册通过可写事件了(一般情况,写操作是不需要注册可写事件的).

RocketMQ的底层使用Netty进行网络通信,我们看下RocketMQ是如何利用它的.

// 代码位置: org.apache.rocketmq.broker.client.ProducerManager#getAvaliableChannel
​
// 选择一个可用的通道
public Channel getAvaliableChannel(String groupId) {
    HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
    List<Channel> channelList = new ArrayList<Channel>();
    if (channelClientChannelInfoHashMap != null) {
        for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
            channelList.add(channel);
        }
        int size = channelList.size();
        if (0 == size) {
            log.warn("Channel list is empty. groupId={}", groupId);
            return null;
        }
​
        int index = positiveAtomicCounter.incrementAndGet() % size;
        Channel channel = channelList.get(index);
        int count = 0;
        // 判断通道是否活跃以及是否可写状态​
        boolean isOk = channel.isActive() && channel.isWritable();
        while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
            if (isOk) {
                return channel;
            }
            index = (++index) % size;
            channel = channelList.get(index);
            isOk = channel.isActive() && channel.isWritable();
        }
    } else {
        log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
        return null;
    }
    return null;
}

公众号 Netty历险记

相关文章

  • Netty的高低水位

    假如我们的底层使用Netty作为网络通信框架,业务流程在将业务数据发送到对端之前,实际先要将数据发送到Netty的...

  • netty工程经验tips

    netty服务端高低水位设置配合channelWritabilityChanged设置autoRead做到保护 n...

  • 高低水位

    If the number of bytes queued in the write buffer exceeds...

  • 探究之路(三)

    1、水轮转的快慢 第一种情况:水位高低不同,小水轮转动快慢不同。 结论:水位高的,小水轮转动快 第二种情况:水...

  • 水位开关是什么

    水位开关分为:电容式水位开关、电子式水位开关、电极式水位开关、光电式水位开关、音叉式水位开关、浮球式水位开关等。电...

  • 修复 netty 高水位溢出问题记录

    导数据时频繁出现maybe write overflow的问题,在这种状态下能持续地运行一段时间,但是过一会lea...

  • 大班科学《乌鸦喝水》

    设计意图: 说活动目标: 1.知识目标:初步感知瓶子里水位高低变化与水量和石子的大小数量的关系。 2.能力目标:乐...

  • 《水位》

    梦中的水位上升 你的呼吸开始困难 被淹没的恐惧与生俱来 如同山峰的恐惧 像歌唱家一样 你努力寻找高音的位置 在登高...

  • 《水位》

    梦中的水位上升 你的呼吸开始困难 被淹没的恐惧与生俱来 如同山峰的恐惧 像歌唱家一样 你努力寻找高音的位置 在登高...

  • 水位

    7月20日,周一,多云 下午,带宝宝到秦淮河边走路。半个月不见,秦淮河的水位高了很多,三层走路道全部被水淹没,如果...

网友评论

      本文标题:Netty的高低水位

      本文链接:https://www.haomeiwen.com/subject/zbaivktx.html