![](https://img.haomeiwen.com/i11168436/6453c1f0fc687937.jpg)
一、故事背景
1、读取离线文件数据,再通过【离线数据】作为条件,查询第三方接口,返回最终的结果,再入库。
2、 业务逻辑是很简单,读取文件、查询接口、返回数据集、入库 四步。
3、业务特性:第三方接口调用400毫秒(ms) 。
如果用普通单线程去跑算500毫秒一个请求,一天也就跑8W多数据量,20多亿的数据不知道跑到猴年马月了。
二、处理方案
A) 初步方案采用ganymed-ssh2(文件都存储在Linux服务器上) 来读文件,Redis来存储消息、多线程来提升处理能力。
B) 流程图:
![](https://img.haomeiwen.com/i11168436/6de2e2bfb640d0ea.png)
- 1、ganymed-ssh2 调用 cat file 来读取文件,push到Redis队列中。
- 2、消费者 lpop数据,通过线程池(ExecutorService) 创建多任务(Thread)消费数据。
- 3、每个线程处理消息、逻辑包括调用第三方接口、入库操作。
三、呈现问题
-
1、ganymed-ssh2 调用 cat file 来读取文件,读取到一半程序终止了怎么办?重新跑吗?
-
2、Redis消息队列是一个单Key 、单Key又是存储在集群某个节点槽位中,如果消费者 消费太慢,造成消息堆积,量大的情况下很容易造成集群出现问题、而且其他节点存储利用不上。
-
3、写Redis队列 是否可以批量、读Redis是否可以批量、而不是一条一条push/pop 。
四、优化问题
最终流程图:
![](https://img.haomeiwen.com/i11168436/0195c40517b6beb4.png)
1、 通过Redis做一个计数器 每读取一行记录数值,即使服务终止后,先从Redis读取这个数值
再通过cat指定行数开始读数据即可。
cat /big-data/file-20201127-1.txt | tail -n +2077681(指定行数)
2、通过取模拆Key 分片到不同小Key存储,降低单个节点存储压力,也充分利用了存储资源。
String key = "big:data:" + Math.abs(String.valueOf(System.currentTimeMillis()).hashCode()% 10);
3、Redis Push 提供了批量方式(leftPushAll),可以指定读取行数再批量入库,而pop并没有提供批量 只能一个一个pop。
//伪代码 指定行入库入库
Map<String,List<String>> keyRowKeyMapping = new HashMap<>();
List<String> list = new ArrayList<>();
while (true) {
if(list.size() == 10000){
addRowKeyToRedisByMap(keyRowKeyMapping);
list = new ArrayList<>();
keyRowKeyMapping = new HashMap<>();
}
}
//伪代码 批量入库
private void addRowKeyToRedisByMap(Map<String,List<String>> mapping){
if(mapping != null){
for (Map.Entry<String, List<String>> entry : mapping.entrySet()) {
redisTemplate.opsForList().leftPushAll(entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]));
}
}
}
4、消费者通过多线程pop、再分发到线程去处理。
//伪代码 创建线程池 、指定线程数量、pop出消息 通过线程去消费。
ExecutorService executorService = Executors.newFixedThreadPool(500);
executorService.execute(()->{
while (true){
String key ="data:clean:" + Math.abs(String.valueOf(System.currentTimeMillis()).hashCode()% 10);
Object message = redisTemplate.opsForList().rightPop(key,1,TimeUnit.SECONDS);
logger.info("RedisConsumer value:{}",message);
if(message == null){
continue;
}
executorService.execute(new DataCleanTask((String) message));
}
});
五、总结问题
-
1、目前这套方案 500个线程在8核CPU机器上,一天可以处理6000万左右的数据,基本达到需求预期。
-
2、代码实现真的很简单,去掉业务代码估计50行代码不到,就可以实现,简单轻便 只要机器性能能够满足,不管多大的数据量都能把CPU跑满,性能达到一定程度。
-
3、队列做消费确认ACK比较麻烦,就是不能保证消费者在读取之后,未处理后的宕机问题。导致消息意外丢失 ,pop、多条数据没有简单的API支持、像Kafka还可以支持pop出指定毫秒数以内的数据返回。
ConsumerRecords<String, String> records = kafkaConsumer.poll(2000);
- 4、最终存储 如果是关系型数据库,业务允许的话最好是分表存储 。单表存储 性能可能会有问题需要调优了。
网友评论