美文网首页大数据学习Scala学习
Spark中使用Scala synchronized 并发加锁创

Spark中使用Scala synchronized 并发加锁创

作者: xiaogp | 来源:发表于2020-10-30 16:11 被阅读0次

    摘要:SparkScalasynchronized

    executor端共享变量

    Spark的rdd调用map或者foreachmapPartitionforeachPartition等方法时,每一个元素或者分区都会并发调用其他方法进行操作,如果其中涉及比如新建一个内存对象,新建一个数据库连接等操作则需要频繁创建对象增加不必要的开销,此时需要将这些对象处理成单例,让程序仅在内存中创建一个对象,executor中所有partition共享这些对象。

    synchronized关键字

    Scala中的synchronized可以对代码块或者方法使用,使得每次只能有一个线程访问,当一个线程获取了锁,其他线程在队列上等待。
    在SiteTypeUtils单例对象中创建whiteSiteList属性初始值为null,synchronized加锁双重校验whiteSiteList是否为null,加锁后并发调用的线程排队进入,第一个线程进入后读取mysql数据实例化whiteSiteList,第二个线程进入再次判断whiteSiteList,此时whiteSiteList不为null此后所有进入锁的线程不会在内存中再次读取mysql创建whiteSiteList。下一轮并发过来先判断whiteSiteList已经不为null,不需要再去走加锁的代码段,一开始就判断null的目的是不需要每次都去获取锁,提升性能。

    object SiteTypeUtils {
      var whiteSiteList: List[String] = _
    
      def getSiteType(configProperties: Properties, url: String): String = {
          if (whiteSiteList == null) {
            this.synchronized {
              if (whiteSiteList == null) {
                println("---------------------初始化读取白名单mysql数据---------------------")
                whiteSiteList = getSiteUrlStatus(configProperties, "select url from dt_white_site where status = 'white'")
                println("white数量:", whiteSiteList.size)
              }
            }
          }
        }
    
      def main(args: Array[String]): Unit = {
      }
    }
    

    观察Spark打印的日志,发现在每个executor中只读取了一次whiteSiteList


    spark日志1.png

    如果不使用synchronized双重校验,会在Spark第一次并发调用这个方法时读取多次mysql和重复创建whiteSiteList,日志如下


    spark日志2.png

    相关文章

      网友评论

        本文标题:Spark中使用Scala synchronized 并发加锁创

        本文链接:https://www.haomeiwen.com/subject/ndrdvktx.html