美文网首页
scala使用par并行集合设置的线程池未关闭导致的内存占用问题

scala使用par并行集合设置的线程池未关闭导致的内存占用问题

作者: 达微 | 来源:发表于2019-06-13 08:29 被阅读0次

    问题产生背景

    程序语言使用的是scala。
    在处理大量数据时,使用了par函数将普通集合转为并行集合集合,通过多线程并发处理达到提升处理效率的目的,相关代码如下:

    for循环{
        ...
      process()
    }
    
    def process(): Unit ={
      val map: Map[String, Int] = Map("a"->1, "b"->2)
      val rangePar = map.par
      //新建固定线程数的线程池
      val pool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
      rangePar.tasksupport = pool
      rangePar.map(
        x=> {
          log.info(s"range Par element : $x")
        }
      )
      pool.environment.shutdown()
    }
    

    结果运行一段时间后,发现内存增加了不少而且还在不停地上升
    问题分析

    从业务上分析,数据量并没有明显增加,于是通过jvisualvm工具查看进程中对象的内存占用情况,发现在短时间内,scala.concurrent.forkjoin.ForkJoinTask[]对象在急剧上升,同时线程数也在不停增加,这些线程创建后,一直没有被释放,且处于waiting状态。这些增长中的没有释放的线程导致内存占用一直上升。

    根据ForkJoinTask这个类最终定位到上面的代码部分。然后在本地测试,发现随着循环次数的增加,内存中的线程数会一直上升且未释放掉,当循环次数为10时,就已经有14个线程处于waiting状态(不知道怎么上传图片 - -!)。

    因此定位问题是在循环中不停创建的线程池没有即使得到释放造成
    问题解决

    知道问题就好办了,有如下两个解决办法:

    (1) 在循环内每次使用完,就及时关闭资源池,释放占用的内存

    valpool = new collection.parallel.ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
    rangePar.tasksupport = pool
    rangePar.map(
      x=> {
        log.info(s"range Par element : $x")
      }
    )
    pool.environment.shutdown()
    

    (2) 将资源池对象设置为全局变量共享,这样也可以解决创建大量未释放线程池的问题。不过这种场景下,要处理好多线程下的情况,可以将线程池对象设置为ThreadLocal变量,各线程使用独立的对象,这样在一定程度上也能解决多线程共享的问题。

    相关文章

      网友评论

          本文标题:scala使用par并行集合设置的线程池未关闭导致的内存占用问题

          本文链接:https://www.haomeiwen.com/subject/npxlfctx.html