Netty UDP 报文截取问题
问题
- 最近在写一个 syslog udp 日志接收器,然后发现接收过大的日志数据会被截断,拿到的信息不完整
源码追踪
创建 udp server 的示例代码
def b = new Bootstrap()
group = new NioEventLoopGroup()
b.group(group)
.channel(NioDatagramChannel.class)
.localAddress(config.udp.port)
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel datagramChannel) throws Exception {
ChannelPipeline channelPipeline = datagramChannel.pipeline()
channelPipeline.addLast(
new UDPSyslogMessageDecoder(),
new SyslogMessageHandler()
)
if (config.log) {
channelPipeline.addLast(new SyslogMessageLogHandler())
}
channelPipeline.addLast(new QianxinLogHandler(vertx,config))
}
})
startFuture = b.bind().sync()
NioDatagramChannel 初始化源码追踪
channel
使用 NioDatagramChannel
,追踪 NioDatagramChannel
源码
- 初始化
NioDatagramChannel
io.netty.channel.socket.nio.NioDatagramChannel.NioDatagramChannel()
public NioDatagramChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
- 调用构造方法
io.netty.channel.socket.nio.NioDatagramChannel#NioDatagramChannel(java.nio.channels.DatagramChannel)
public NioDatagramChannel(DatagramChannel socket) {
super(null, socket, SelectionKey.OP_READ);
config = new NioDatagramChannelConfig(this, socket);
}
- 初始化
NioDatagramChannelConfig
配置信息
io.netty.channel.socket.nio.NioDatagramChannelConfig.NioDatagramChannelConfig
NioDatagramChannelConfig(NioDatagramChannel channel, DatagramChannel javaChannel) {
super(channel, javaChannel.socket());
this.javaChannel = javaChannel;
}
- 调用父类构造方法
io.netty.channel.socket.DefaultDatagramChannelConfig#DefaultDatagramChannelConfig
public DefaultDatagramChannelConfig(DatagramChannel channel, DatagramSocket javaSocket) {
// 初始化 2048 字节的固定长度的接收缓冲区
super(channel, new FixedRecvByteBufAllocator(2048));
this.javaSocket = ObjectUtil.checkNotNull(javaSocket, "javaSocket");
}
原因
NioDatagramChannel
默认缓冲区大小只给了 2048 ,开发一个 Syslog UDP 协议服务,日志大小其实就不止这么点,所以日志被截取一部分导致问题出现
解决方法
构建 Bootstrap
增加参数选项,把默认 2048 固定缓冲区调大
实际构建代码如下:
def b = new Bootstrap()
group = new NioEventLoopGroup()
b.group(group)
.channel(NioDatagramChannel.class)
.localAddress(config.udp.port)
// 增加下面这一行代码 固定 64 k大小,这个根据实际情况调整
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
.handler(new ChannelInitializer<DatagramChannel>() {
@Override
protected void initChannel(DatagramChannel datagramChannel) throws Exception {
ChannelPipeline channelPipeline = datagramChannel.pipeline()
channelPipeline.addLast(
new UDPSyslogMessageDecoder(),
new SyslogMessageHandler()
)
if (config.log) {
channelPipeline.addLast(new SyslogMessageLogHandler())
}
channelPipeline.addLast(new QianxinLogHandler(vertx,config))
}
})
startFuture = b.bind().sync()
网友评论