Spark Streaming的处理模式是按照Batch Duration进行Micro Batch Computation的,且如果上一批的数据没有处理完的话是不会处理下一批数据的!!这回导致几个恶果:
1、如果前面一个Batch数据量突然特别大的话,就会导致计算机的高度延迟,使得当前的Batch不能够得到及时的计算,以此类推...会陷入恶性循环;
2、在一个Batch处理的时候如果Task处理的时间波动比较大(例如说数据倾斜、数据的峰值、出错等),其他的Task都已经处理完了,所以整个Batch处理就只是等待这个Task处理完成,却不能够使用Memory和Cores等资源处理下一个Batch任务,会造成极大的资源浪费;
3、JVM的GC的巨大负担;
市面上说的解决方式:限流、改partition并行度、增加cores和memory等等
所以,唯一的效果显著的办法:不要等待!!!什么意思?就是无论Batch Duration数据大小和处理的复杂度,都会立即完成当前的Batch的处理,然后立即去处理下一个Batch任务!!!
怎么 做?此时我们既要完成业务计算,又要达到毫秒级别的延迟!
一个可能的办法是:Spark Streaming的业务逻辑放在线程池中!而绝妙之处在于,Spark Streaming程序执行的时候业务逻辑就是以
逻辑就是以Task的方式放在线程池中的!所以可以最大化的复用线程,及合理利用硬件资源!模拟代码如下:
dstream.foreachRDD{
rdd.foreachPartition(splite => {
//业务处理逻辑,如果直接处理的就会是阻塞式的,但是此时我们可以使用线程池去处理业务逻辑 (此处的线程池是你自己定义的)此时任务坑定是毫秒级别完成。
唯一需要注意的是线程数受限于物理硬件,所以需要根据实际情况设定线程池中并发Task的个数,例如不能够超过200个,如果超过了怎么办?新的Task就采用阻塞式操作,此时其他的线程在并行运行并且不断的完成Task,就会有新的可用线程来进行异步操作!
})
}
网友评论