性能优化
为了获得运行在集群上的Spark应用的最好的性能需要一些调优。这一部分介绍了一些能够改善应用性能的一些参数和配置。从更高的角度看,你需要考虑两件事情:
1、高效的使用集群的资源从而降低每个batch的数据处理的时间
2、设置合理的batch的大小从而数据使得数据处理的速度和接收速度一样。
减少每个Batch的处理时间
为了减少每个batch的处理时间 是有许多可以优化操作,稍后 Tuning Guide
中有讨论。这一节着重讲述了比较重要的一些优化。
数据接收的并行度
通过网络接收到的数据需要数据在spark内进行反序列化和存储。如果数据的接收变成了系统的瓶颈,就需要考虑并行处理数据的接收。需要注意的是,每个输入流在worker的节点上创建了单独的receiver,它只会接收一个流的数据。接收多个数据流的花需要通过创建多个数据流并且配置它们使得能够接收输入流的不同分区的数据。举个例子,一个kafka的输入DStream 接收两个topic的数据可以其分成两个stream,每个接收一个topic。这样就会运行两个receiver,这样就会并行接收数据,因而提高了整体的吞吐量。多个DStream 可以union成一个DStream,然后之前应用于一个DStream的transformation 操作就可以应用于被unioned的DStream上了,操作入下:
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
另外一个需要考虑的就是 receiver的 block interval(区块时长暂时翻译),这个由参数configuration parameter spark.streaming.blockInterval 决定
接收多个数据流数据另外一个选择是使用inputStream.repartition(<number of partitions>) 明确的切分输入数据流。它会在数据处理之前,把接收到的多个batch的数据分发到集群中的指定数量的机器上
数据处理的并行度
如果并行计算的task的数量不够大的话,集群的资源利用率是不高的。举个例子,分布式的reduce操作,比如reduceByKey和reduceByKeyAndWindow,这个默认的并行度是参数 spark.default.parallelism 控制的。你可以作为参数传入并行度或者配置这个参数来修改默认值。
数据的序列化
数据序列化的压力可以通过优化序列化方式的方法解决降低。针对于streaming的情况,这里有两种类型的数据可以被序列化。
- InputData:默认情况下,Receiver接收到的数据是存储在executors的内存内的,存储的级别是StorageLevel.MEMORY_AND_DISK_SER_2.也就是说,数据是被序列化成字节以减少刚才的压力。并且会被复制以防executor失败。而且数据会优先保存在内存,直到需要计算的数据在内存已经保存不下的时候才会写入到磁盘。这些序列化明显过度耗费资源,reciiver必须反序列化接收到的数据并且重新序列化成spark序列化的方式。
- Streaming操作产生的持久化的RDD streaming计算产生的RDD会被持久化在内存中,举个例子,窗口操作会在内存中持久化这些数据以防数据需要多次被处理。当时不同于SparkCore默认的StorageLevel.MEMORY_ONLY,持久化的RDD是默认是按照StorageLevel.MEMORY_ONLY_SER进行持久化的。
以上两种情形,使用Kryo序列化会降低CPU和内存的过度使用。
在某些特殊的情况下,比如spark不需要保持大量的数据,持久化数据使用反序列化后的对象不会导致过度的gc压力,所以也是一种可行的办法。举个例子,如果你在使用一个几秒的batch并且没有window的操作,你可以显示的设置storage的级别从而关闭序列化。这将会减少cpu因为序列化而导致的压力,从而提升性能。
启动的Task过多
如果每秒钟启动的task的数量非常高(比如,每秒50或者更多),那么分发任务到slave上的压力将会非常大,并且将会使得要想获得ms级别的延迟变得很难。这种压力可以通过如下的改变降低:
执行模式:执行Spark使用standalone 模式或者粗粒度的 Mesos模式task的启动时间会优于使用细粒度的Mesos的模式 可以参考 Running on Mesos guide
这个改变可以减少每个batch的时间到几百ms,从而是的亚秒级的batchsize变得可行。
设置正确的batch间隔
为了保证运行在集群上的spark应用稳定,必须保证数据处理的速度要达到数据接收的速度。换句话说,每个batch处理数据的速度必须和产生的速度一致。是否能一致可以通过monitoring的web ui 上的处理时间看到。正常情况下,处理时间要小于间隔时间。
取决于streaming计算的天然特征,对于固定资源的集群,batch的间隔对于数据在应用中的保持率有重大影响。举个例子,比如 WordCountNetwork,针对于特定的数据速率,系统可以支持每隔2s的单词统计,但是却不能支持500ms的。因此,batch的间隔时间需要设置成实际生产中需要保持的的期望的数据速率。
一个好方法就是 计算一个合适的batch的大小 去测试一 保守的batch间隔和一个比较低的数据速率。为了验证系统是否能跟上数据的速率,你可以查看每个处理过的batch的端到端的处理延迟。如果delay的时间和batch的大小差不很多,那么系统就是稳定。否则的花,如果delay持续增加,意味着系统跟不上数据的速率从而变得不稳定。一旦你有两个一个固定的配置,你就可以尝试增加数据的速率或者减少batch的大小。需要注意的是,由于缓存数据增加导致的内存增加是ok的,只要延迟时间降到一个很低的值。
网友评论