美文网首页
通过Redis消息队列实现大文件处理

通过Redis消息队列实现大文件处理

作者: 莫妮卡笔记 | 来源:发表于2020-12-08 17:56 被阅读0次
123465.jpg

一、故事背景
1、读取离线文件数据,再通过【离线数据】作为条件,查询第三方接口,返回最终的结果,再入库。
2、 业务逻辑是很简单,读取文件、查询接口、返回数据集、入库 四步。
3、业务特性:第三方接口调用400毫秒(ms) 。
如果用普通单线程去跑算500毫秒一个请求,一天也就跑8W多数据量,20多亿的数据不知道跑到猴年马月了。

二、处理方案
A) 初步方案采用ganymed-ssh2(文件都存储在Linux服务器上) 来读文件,Redis来存储消息、多线程来提升处理能力。
B) 流程图:

image.png
  • 1、ganymed-ssh2 调用 cat file 来读取文件,push到Redis队列中。
  • 2、消费者 lpop数据,通过线程池(ExecutorService) 创建多任务(Thread)消费数据。
  • 3、每个线程处理消息、逻辑包括调用第三方接口、入库操作。

三、呈现问题

  • 1、ganymed-ssh2 调用 cat file 来读取文件,读取到一半程序终止了怎么办?重新跑吗?

  • 2、Redis消息队列是一个单Key 、单Key又是存储在集群某个节点槽位中,如果消费者 消费太慢,造成消息堆积,量大的情况下很容易造成集群出现问题、而且其他节点存储利用不上。

  • 3、写Redis队列 是否可以批量、读Redis是否可以批量、而不是一条一条push/pop 。

四、优化问题

最终流程图:


image.png

1、 通过Redis做一个计数器 每读取一行记录数值,即使服务终止后,先从Redis读取这个数值
再通过cat指定行数开始读数据即可。

cat /big-data/file-20201127-1.txt | tail -n +2077681(指定行数)

2、通过取模拆Key 分片到不同小Key存储,降低单个节点存储压力,也充分利用了存储资源。

    String key = "big:data:" + Math.abs(String.valueOf(System.currentTimeMillis()).hashCode()% 10);

3、Redis Push 提供了批量方式(leftPushAll),可以指定读取行数再批量入库,而pop并没有提供批量 只能一个一个pop。

      //伪代码 指定行入库入库
        Map<String,List<String>> keyRowKeyMapping = new HashMap<>();
        List<String> list = new ArrayList<>();
                    while (true) {
                         if(list.size() == 10000){
                            addRowKeyToRedisByMap(keyRowKeyMapping);
                            list = new ArrayList<>();
                            keyRowKeyMapping = new HashMap<>();
            }
}
      //伪代码 批量入库
    private void addRowKeyToRedisByMap(Map<String,List<String>> mapping){
        if(mapping != null){
            for (Map.Entry<String, List<String>> entry : mapping.entrySet()) {
                redisTemplate.opsForList().leftPushAll(entry.getKey(), entry.getValue().toArray(new                           String[entry.getValue().size()]));
            }
        }
    }


4、消费者通过多线程pop、再分发到线程去处理。

//伪代码 创建线程池 、指定线程数量、pop出消息  通过线程去消费。
ExecutorService executorService = Executors.newFixedThreadPool(500);
        executorService.execute(()->{
            while (true){
                String key ="data:clean:" + Math.abs(String.valueOf(System.currentTimeMillis()).hashCode()% 10);
                Object message = redisTemplate.opsForList().rightPop(key,1,TimeUnit.SECONDS);
                logger.info("RedisConsumer value:{}",message);
                if(message == null){
                    continue;
                }
                executorService.execute(new DataCleanTask((String) message));

            }

        });

五、总结问题

  • 1、目前这套方案 500个线程在8核CPU机器上,一天可以处理6000万左右的数据,基本达到需求预期。

  • 2、代码实现真的很简单,去掉业务代码估计50行代码不到,就可以实现,简单轻便 只要机器性能能够满足,不管多大的数据量都能把CPU跑满,性能达到一定程度。

  • 3、队列做消费确认ACK比较麻烦,就是不能保证消费者在读取之后,未处理后的宕机问题。导致消息意外丢失 ,pop、多条数据没有简单的API支持、像Kafka还可以支持pop出指定毫秒数以内的数据返回。

 ConsumerRecords<String, String> records = kafkaConsumer.poll(2000);
  • 4、最终存储 如果是关系型数据库,业务允许的话最好是分表存储 。单表存储 性能可能会有问题需要调优了。

相关文章

网友评论

      本文标题:通过Redis消息队列实现大文件处理

      本文链接:https://www.haomeiwen.com/subject/crxqgktx.html