美文网首页JavaSE
有序队列与异步拼接任务

有序队列与异步拼接任务

作者: 唐执 | 来源:发表于2019-07-17 11:01 被阅读2次

    最近遇到一个问题,有两个日志流(kafka)需要进行拼接,但是两个日志流并不是同步进行,而是其中一个日志流存在10分钟左右的延迟;所庆幸的是其中一个日志流只有标签数据数量较小,但是另一个日志流会较大,10分钟可以达到T级别的数量;

问题

   问题实际上只有一个就是数量巨大,这会带来几个问题

        1、存储问题:需要一个大容量的地方缓冲>10分钟的数据(两个队列数据),并且支持查询(join的时候需要查询出来),并且数据最好可以过期(除非家中有矿,可以持续存储这种数据)

        2、查询问题:除了存储问题,实现时肯定是遍历一个队列数据然后查询另一个数据是否到达,这就要求存储需要额外提供一种遍历的能力,应该是基于时间范围遍历的一种能力

        3、拼接时机问题:两个队列的在拼接时有两个方案可以选择

                a、实时拼接:即两个日志中只要存在匹配数据,就拼接完成然后下发数据;这个实时性较好,但是要求存储系统会有一定压力;但是实时拼接的前提是,确定什么状态才能确定已经拼接完成;

                b、延时拼接:即等到超时时间(例如:等10分钟再进行拼接)在处理数据;这个实时性差一些,但是压力会加少很多,只要存储提供基本的遍历能力即可;

解决思路

    业务上无法确定拼接完成的状态(需要等很多很多数据到达才行),因此只能硬生生的缓存这10分钟的数据,这个当然不是个好消息;

解决定位

    我将这个定义为一个延迟队列问题,而不是一个多线程等待问题,主要基于以下几点:

    a、多线程等待问题,实际上是任务在执行等待完成,而不是等待执行

    b、多线程等待问题的前提是,资源足够,即有内存和cpu资源足够;显然这个任务得家里有矿才行

    c、多线程等待问题是为提高处理效率,而这个任务多线程只会降低资源利用率

大堵小存


实际上主要的解决思路是:大堵小存;这个是基于kafka的一个解决思路,也是快速解决业务问题的一个便宜方法,基于现有资源;当然,flink也是一个解决方案,这个将后续讨论,下面将对方案进行解释

a、将小数据存储起来,根据消息实时更新存储;数据存储的方案很多,数据量较小,选择空间很大;

b、将大数据阻塞到kafka中,这是kafka允许的,当然会给kafka代码一部分压力;数据到达后去查询存储中的数据,完成数据的拼接;

c、本地有序队列,这个实际上是MQ的一个副作用,队列主要解决两个问题:

    1、业务有序问题;kafka只能保证了投递有序(即指定partition的数据投递时间有序),但是业务上时间无法保证(这个确实是存在的,有可能是发送时失败或者处理任务阻塞等)

    2、partition互相影响问题:基础架构组没有提供可以分partition消费的方案,而只提供来kafka提供的stream的方案;会导致如果一个实例中消费多余1个partition,就会出现数据量大的partition(partition很难消息数据完全一致的,即使一致也会因为耗时等问题导致资源倾斜)会不断的侵占资源,导致影响另一个partition的消耗;最后的表现为同一个实例,不同的partition的lag差距较大

本地有序队列

  本地会实现一个基于业务时间的有序队列,主要目的已经在上文中解释,实际也是使用MQ会遇到的问题

本地的实现颇为简单,主要是需要一个按照存取的队列(不一定有序,有序只是某种便利的实现),队列要完成以下功能

1、按照时间戳取数:当然有序是最好的只要取队列头判断时间即可支持是否已经超时(超过需要等待的时间)

2、写入阻塞:这不是一个必要条件(家里有矿可以把所有数据都存下来也没问题),阻塞的好处在于在对于在等待时间范围的数据直接阻塞在kafka中没必要拉到本地;主要是节省资源和降低内存压力,你完全无法知道对象需要耗费多大内存(我处理的数据确实很大)

3、队列监控:这个也许是习惯,对于所有这种延迟任务,人的厌烦等待的本能会让你很焦虑,开心很重要;

后续问题

后续遇到的问题,实际上也是这种方案的缺点所在,这是一个便宜的解决方案,最贵的资源就是kafka了;

1、队列大小问题:kafka的消费方案存在partition不均匀而且会彼此影响,因此队列大小很关键不能太小,最低要消除这种不平均带来的影响;初期队列开到40000左右(情况不同会略有差别)才抵挡住这种影响,当时还会存在一定量的堆积,表现为低峰时段消费任务数的暴增

2、内存压力:这个压力一方面来自与队列过大导致,另一方面也来自于阻塞队列导致的kafka缓存(基础架构组包装的框架也会使用缓存)堆积问题,在java方面表现为gc频繁和gc时间过长;java方面建议使用G1,傻瓜一些

3、分区优化:实际上kafka的一个优势表现在分区上,因此有些优化我们已在使用:

    a、增加分区数来提供处理效率:增加分区数不是浪费资源那么简单,实际上是提高了并发执行数与减少单个分区的压力,这也是MQ优势的表现;实际上堆机器可以解决掉问题是很大的进步,不是所有问题都可以化解为这么简单的解决方法;

    b、增加实例数:随着数据量的增加,我们队列的增加导致堆积愈发严重,出现过多次gc问题;因此后来我们调整为实例数与分区数保持一致,但是单个实例的内存(队列大小降低为1000)和cpu数,相对来说并没有增加太多资源

为什么不使用flink

    flink擅长于计算任务,擅长那种cpu密集内存消耗不要太大的计算的任务,其中一个原因是他的checkpoint是他一个重要优点也是缺点;曾经尝试使用flink完成这个任务,实际上这种任务有明显的flink偏向(join任务);但是,需要的内存数和checkpoint时候的压力,让flink的同学抓狂;而且,checkpoint失败后,flink无法恢复带来的损失也我们也无法承受;这个只是一家之言,Blink已经在解决checkpoint的问题,但是内存消耗却是无法避免的;现在的lambda架构和kappa架构,孰优孰劣实际上也是看应用场景来说;

相关文章

  • 有序队列与异步拼接任务

    最近遇到一个问题,有两个日志流(kafka)需要进行拼接,但是两个日志流并不是同步进行,而是其中一个日志流存在10...

  • GCD总结

    一,基本 1,串行队列 异步任务 串行队列 异步任务,会创建子线程,且只创建一个子线程,异步任务执行是有序的。 2...

  • JavaScript 异步编程

    同步模式与异步模式 时间循环与消息队列 异步编程的几种方式 Primise异步方案 宏任务 /微任务队列 Ge...

  • Objective-C基础-多线程

    1、常见多线程方案 2、队列与同步异步 例子 练习 3、队列组的使用 异步并发执行任务1、任务2等任务1、任务2都...

  • 【Flask】简单的后台异步队列

    异步任务队列 异步队列输入 异步队列获取并执行 创建线程开始执行方法并返回进程号 主函数与flask main一起...

  • js运行机制

    在运行时会形成任务队列,分为同步任务队列和异步任务对列,同步队列优先加载异步任务队列会形成队列任务池,定时器不会一...

  • nextTick、setImmediate、promise.re

    执行队列(同步) 任务队列(异步事件) 每当异步任务完成后会在任务队列中添加一个关联事件,待执行队列任务执行完成...

  • GCD小总结

    单例模式 串行队列同步/异步执行任务 并发队列同步/异步执行任务 队列组 延时执行 barrier

  • iOS多线程同步异步、串行并发浅析

    先来说一个队列和任务: 队列分为串行队列与并行队列 任务的执行分为同步执行与异步执行 这两两组合就成为了串行队列同...

  • 《iOS高级开发之多线程编程之三》

    主队列上的同步异步执行 主队列 异步执行 在主线程有序执行dispatch_queue_t queue = di...

网友评论

    本文标题:有序队列与异步拼接任务

    本文链接:https://www.haomeiwen.com/subject/oxxqlctx.html