1. 背景
项目中有这样一个需求:两个服务之间需要传递视频监控平台的设备及分组信息。一个视频监控平台中通常有10万数量级的监控设备信息,每个设备的详细信息可能有二三十个字段,再加上分组信息,传递的信息量接近200M左右。以前使用发送多包消息的方式传递,这样发送方与接收方需要以事务的方式处理消息,如接收方没有收完整,就需要等待超时。如果传输过程中产生了丢包,则发送方需要重发,发送方与接收方的逻辑就变得比较复杂。
目前我们使用kafka作为项目中的消息中间件,考虑将所有发送的设备信息打成一包消息收发,这样处理上变得简单。
kafka官方推荐使用消息大小不超过1M时吞吐量最佳。要发送达200M的消息,需要在服务器端,及生产者端及消费者端做一些配置与策略。
2. 配置
2.1 生产者与消费者的配置
需要设置生产者和消费者的配置以接收和发送大消息,在项目的springboot工程中,application.yml文件配置如下:
kafka:
producer:
bootstrap-servers: 172.16.64.159:9092
buffer-memory: 536870912
properties:
request.timeout.ms: 900000
max.request.size: 536870912
compression.type: gzip
consumer:
bootstrap-servers: 172.16.64.159:9092
auto-offset-reset: latest
properties:
max.poll.records: 1
request.timeout.ms: 900000
fetch.max.bytes: 536870912
max.partition.fetch.bytes: 536870912
enable-auto-commit: true
这里生产者的buffer-memory被设为500M;消息的压缩类型被设置为gzip,可保证传输与持久化时压缩数据量;由于发送消息的时间可能会因消息量变长,因此请求超时时间设置为900秒。消费者的max.poll.records被设为1,表示每次只取一个消息;同样,fetch.max.bytes和max.partition.fetch.bytes都设置为500M。
2.2 kafka服务器端配置
kafka服务器 端的配置文件也需要修改,对server.properties配置文件配置项更改如下:
message.max.bytes=536870912
replica.fetch.max.bytes=536870912
request.timeout.ms=900000
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1024000
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1024000
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=536870912
3. 测试情况
从测试情况来看,使用压缩机制以后,150M的JSON格式消息可压缩到十分之一,在应用服务与kafka服务器在同一局域网情况下,收发时间并不长。但在应用服务消费时,由于解压消息,反序列化消息到对象,基本上会消耗至少3倍消息的堆内存,使用jconsole监控,如下图所示:
图1.png
可以看到CPU和堆内存出现几次峰值的地方都是在消费大消息时产生,若JVM初始堆内存设置的不够大或已经使用较多,会出现OOM的错误,因此建议在每次消费大消息之后手工GC一下(system.gc())。
4. 思考
由于我们的项目每天只同步一次监控平台设备信息,因此kafka每天只收发大消息一次,系统还可支撑。如果要频繁收发这种方法不可取,容易引起应用OOM,和kafka的崩溃与效率低下。其实我认为使用折中方案,将应用的大消息按逻辑划分为多个包,包的数目在10个左右,包的大小在10M左右,这样在效率与系统稳定性方面会更好。
网友评论