问题产生背景
程序语言使用的是scala。
在处理大量数据时,使用了par函数将普通集合转为并行集合集合,通过多线程并发处理达到提升处理效率的目的,相关代码如下:
for循环{
...
process()
}
def process(): Unit ={
val map: Map[String, Int] = Map("a"->1, "b"->2)
val rangePar = map.par
//新建固定线程数的线程池
val pool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
rangePar.tasksupport = pool
rangePar.map(
x=> {
log.info(s"range Par element : $x")
}
)
pool.environment.shutdown()
}
结果运行一段时间后,发现内存增加了不少而且还在不停地上升
问题分析
从业务上分析,数据量并没有明显增加,于是通过jvisualvm工具查看进程中对象的内存占用情况,发现在短时间内,scala.concurrent.forkjoin.ForkJoinTask[]对象在急剧上升,同时线程数也在不停增加,这些线程创建后,一直没有被释放,且处于waiting状态。这些增长中的没有释放的线程导致内存占用一直上升。
根据ForkJoinTask这个类最终定位到上面的代码部分。然后在本地测试,发现随着循环次数的增加,内存中的线程数会一直上升且未释放掉,当循环次数为10时,就已经有14个线程处于waiting状态(不知道怎么上传图片 - -!)。
因此定位问题是在循环中不停创建的线程池没有即使得到释放造成
问题解决
知道问题就好办了,有如下两个解决办法:
(1) 在循环内每次使用完,就及时关闭资源池,释放占用的内存
valpool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
rangePar.tasksupport = pool
rangePar.map(
x=> {
log.info(s"range Par element : $x")
}
)
pool.environment.shutdown()
(2) 将资源池对象设置为全局变量共享,这样也可以解决创建大量未释放线程池的问题。不过这种场景下,要处理好多线程下的情况,可以将线程池对象设置为ThreadLocal变量,各线程使用独立的对象,这样在一定程度上也能解决多线程共享的问题。
网友评论