最近遇到一个问题,有两个日志流(kafka)需要进行拼接,但是两个日志流并不是同步进行,而是其中一个日志流存在10分钟左右的延迟;所庆幸的是其中一个日志流只有标签数据数量较小,但是另一个日志流会较大,10分钟可以达到T级别的数量;
问题
问题实际上只有一个就是数量巨大,这会带来几个问题
1、存储问题:需要一个大容量的地方缓冲>10分钟的数据(两个队列数据),并且支持查询(join的时候需要查询出来),并且数据最好可以过期(除非家中有矿,可以持续存储这种数据)
2、查询问题:除了存储问题,实现时肯定是遍历一个队列数据然后查询另一个数据是否到达,这就要求存储需要额外提供一种遍历的能力,应该是基于时间范围遍历的一种能力
3、拼接时机问题:两个队列的在拼接时有两个方案可以选择
a、实时拼接:即两个日志中只要存在匹配数据,就拼接完成然后下发数据;这个实时性较好,但是要求存储系统会有一定压力;但是实时拼接的前提是,确定什么状态才能确定已经拼接完成;
b、延时拼接:即等到超时时间(例如:等10分钟再进行拼接)在处理数据;这个实时性差一些,但是压力会加少很多,只要存储提供基本的遍历能力即可;
解决思路
业务上无法确定拼接完成的状态(需要等很多很多数据到达才行),因此只能硬生生的缓存这10分钟的数据,这个当然不是个好消息;
解决定位
我将这个定义为一个延迟队列问题,而不是一个多线程等待问题,主要基于以下几点:
a、多线程等待问题,实际上是任务在执行等待完成,而不是等待执行
b、多线程等待问题的前提是,资源足够,即有内存和cpu资源足够;显然这个任务得家里有矿才行
c、多线程等待问题是为提高处理效率,而这个任务多线程只会降低资源利用率
大堵小存
实际上主要的解决思路是:大堵小存;这个是基于kafka的一个解决思路,也是快速解决业务问题的一个便宜方法,基于现有资源;当然,flink也是一个解决方案,这个将后续讨论,下面将对方案进行解释
a、将小数据存储起来,根据消息实时更新存储;数据存储的方案很多,数据量较小,选择空间很大;
b、将大数据阻塞到kafka中,这是kafka允许的,当然会给kafka代码一部分压力;数据到达后去查询存储中的数据,完成数据的拼接;
c、本地有序队列,这个实际上是MQ的一个副作用,队列主要解决两个问题:
1、业务有序问题;kafka只能保证了投递有序(即指定partition的数据投递时间有序),但是业务上时间无法保证(这个确实是存在的,有可能是发送时失败或者处理任务阻塞等)
2、partition互相影响问题:基础架构组没有提供可以分partition消费的方案,而只提供来kafka提供的stream的方案;会导致如果一个实例中消费多余1个partition,就会出现数据量大的partition(partition很难消息数据完全一致的,即使一致也会因为耗时等问题导致资源倾斜)会不断的侵占资源,导致影响另一个partition的消耗;最后的表现为同一个实例,不同的partition的lag差距较大
本地有序队列
本地会实现一个基于业务时间的有序队列,主要目的已经在上文中解释,实际也是使用MQ会遇到的问题
本地的实现颇为简单,主要是需要一个按照存取的队列(不一定有序,有序只是某种便利的实现),队列要完成以下功能
1、按照时间戳取数:当然有序是最好的只要取队列头判断时间即可支持是否已经超时(超过需要等待的时间)
2、写入阻塞:这不是一个必要条件(家里有矿可以把所有数据都存下来也没问题),阻塞的好处在于在对于在等待时间范围的数据直接阻塞在kafka中没必要拉到本地;主要是节省资源和降低内存压力,你完全无法知道对象需要耗费多大内存(我处理的数据确实很大)
3、队列监控:这个也许是习惯,对于所有这种延迟任务,人的厌烦等待的本能会让你很焦虑,开心很重要;
后续问题
后续遇到的问题,实际上也是这种方案的缺点所在,这是一个便宜的解决方案,最贵的资源就是kafka了;
1、队列大小问题:kafka的消费方案存在partition不均匀而且会彼此影响,因此队列大小很关键不能太小,最低要消除这种不平均带来的影响;初期队列开到40000左右(情况不同会略有差别)才抵挡住这种影响,当时还会存在一定量的堆积,表现为低峰时段消费任务数的暴增
2、内存压力:这个压力一方面来自与队列过大导致,另一方面也来自于阻塞队列导致的kafka缓存(基础架构组包装的框架也会使用缓存)堆积问题,在java方面表现为gc频繁和gc时间过长;java方面建议使用G1,傻瓜一些
3、分区优化:实际上kafka的一个优势表现在分区上,因此有些优化我们已在使用:
a、增加分区数来提供处理效率:增加分区数不是浪费资源那么简单,实际上是提高了并发执行数与减少单个分区的压力,这也是MQ优势的表现;实际上堆机器可以解决掉问题是很大的进步,不是所有问题都可以化解为这么简单的解决方法;
b、增加实例数:随着数据量的增加,我们队列的增加导致堆积愈发严重,出现过多次gc问题;因此后来我们调整为实例数与分区数保持一致,但是单个实例的内存(队列大小降低为1000)和cpu数,相对来说并没有增加太多资源
为什么不使用flink
flink擅长于计算任务,擅长那种cpu密集内存消耗不要太大的计算的任务,其中一个原因是他的checkpoint是他一个重要优点也是缺点;曾经尝试使用flink完成这个任务,实际上这种任务有明显的flink偏向(join任务);但是,需要的内存数和checkpoint时候的压力,让flink的同学抓狂;而且,checkpoint失败后,flink无法恢复带来的损失也我们也无法承受;这个只是一家之言,Blink已经在解决checkpoint的问题,但是内存消耗却是无法避免的;现在的lambda架构和kappa架构,孰优孰劣实际上也是看应用场景来说;
网友评论