美文网首页Apache Kafka
使用kafka收发大消息引发的思考

使用kafka收发大消息引发的思考

作者: 简单是美美 | 来源:发表于2019-06-26 10:20 被阅读0次

    1. 背景

      项目中有这样一个需求:两个服务之间需要传递视频监控平台的设备及分组信息。一个视频监控平台中通常有10万数量级的监控设备信息,每个设备的详细信息可能有二三十个字段,再加上分组信息,传递的信息量接近200M左右。以前使用发送多包消息的方式传递,这样发送方与接收方需要以事务的方式处理消息,如接收方没有收完整,就需要等待超时。如果传输过程中产生了丢包,则发送方需要重发,发送方与接收方的逻辑就变得比较复杂。
      目前我们使用kafka作为项目中的消息中间件,考虑将所有发送的设备信息打成一包消息收发,这样处理上变得简单。
      kafka官方推荐使用消息大小不超过1M时吞吐量最佳。要发送达200M的消息,需要在服务器端,及生产者端及消费者端做一些配置与策略。

    2. 配置

    2.1 生产者与消费者的配置

      需要设置生产者和消费者的配置以接收和发送大消息,在项目的springboot工程中,application.yml文件配置如下:

     kafka:
        producer:
          bootstrap-servers: 172.16.64.159:9092
          buffer-memory: 536870912
          properties:
            request.timeout.ms: 900000
            max.request.size: 536870912 
            compression.type: gzip 
          
          
        consumer:
          bootstrap-servers: 172.16.64.159:9092
          auto-offset-reset: latest
          properties:
            max.poll.records: 1
            request.timeout.ms: 900000
            fetch.max.bytes: 536870912
            max.partition.fetch.bytes: 536870912 
          enable-auto-commit: true
         
    

      这里生产者的buffer-memory被设为500M;消息的压缩类型被设置为gzip,可保证传输与持久化时压缩数据量;由于发送消息的时间可能会因消息量变长,因此请求超时时间设置为900秒。消费者的max.poll.records被设为1,表示每次只取一个消息;同样,fetch.max.bytes和max.partition.fetch.bytes都设置为500M。

    2.2 kafka服务器端配置

      kafka服务器 端的配置文件也需要修改,对server.properties配置文件配置项更改如下:

    message.max.bytes=536870912
    replica.fetch.max.bytes=536870912
    request.timeout.ms=900000
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=1024000
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=1024000
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=536870912
    

    3. 测试情况

      从测试情况来看,使用压缩机制以后,150M的JSON格式消息可压缩到十分之一,在应用服务与kafka服务器在同一局域网情况下,收发时间并不长。但在应用服务消费时,由于解压消息,反序列化消息到对象,基本上会消耗至少3倍消息的堆内存,使用jconsole监控,如下图所示:


    图1.png

      可以看到CPU和堆内存出现几次峰值的地方都是在消费大消息时产生,若JVM初始堆内存设置的不够大或已经使用较多,会出现OOM的错误,因此建议在每次消费大消息之后手工GC一下(system.gc())。

    4. 思考

      由于我们的项目每天只同步一次监控平台设备信息,因此kafka每天只收发大消息一次,系统还可支撑。如果要频繁收发这种方法不可取,容易引起应用OOM,和kafka的崩溃与效率低下。其实我认为使用折中方案,将应用的大消息按逻辑划分为多个包,包的数目在10个左右,包的大小在10M左右,这样在效率与系统稳定性方面会更好。

    相关文章

      网友评论

        本文标题:使用kafka收发大消息引发的思考

        本文链接:https://www.haomeiwen.com/subject/nzyjxctx.html