美文网首页
Spark权威指南读书笔记(三):低级API

Spark权威指南读书笔记(三):低级API

作者: kaiker | 来源:发表于2020-11-29 16:32 被阅读0次

    第十二章 弹性分布式数据集

    • 什么是低级API:用于处理分布式数据(RDD)、用于分发和处理分布式共享变量
    • 如何使用低级API:SparkContext是低级API函数库的入口,可以通过SparkSession来获取SparkContext

    RDD

    • DataFrame Dataset运行时都编译为RDD
    • RDD是一个只读不可变的且已分块的记录集合,可以被并行处理
    • RDD中记录的是对象
    RDD类型

    通用型RDD
    key-value RDD,支持特殊操作并支持按key自定义数据分片

    每个RDD具有五个主要内部属性:

    • 数据分区列表
    • 作用在每个分区的计算函数
    • 描述与其他RDD的依赖关系列表
    • 为key-value RDD配置的分区方法
    • 优先位置列表,指定每个分区的处理位置偏好

    RDD使用方法

    创建RDD
    spark.range(500).rdd
    spark.range(10).toDF().rdd.map(rowObject => rowObject.getLong(0))
    spark.sparkContext.textFile("/some/path/withTextFiles")
    
    转换操作
    • filter
    def startsWithS(individual:String) = {
      individual.startsWith("S")
    }
    words.filter(word => startsWithS(word)).collect()
    
    • map
      将给定数据集中的记录一条一条地输入该函数处理以得到你期望的结果(1对1)
      比如这样把一条记录打成了三行

    val words2 = words.map(word => (word, word(0), word.startsWith("S")))

    • flatmap
      每个当前行会映射为多行(1对多),输出一个可展开的迭代器

    words.flatMap(word => word.toSeq).take(5)

    • randomSplit
      将一个RDD随机分成若干个RDD,这些RDD组成一个RDD的Array返回
      两个参数,一个是权重Array,一个是随机种子

    val fiftyFiftySplit = words.randomSplit(Array[Double](0.5, 0.5))

    动作操作
    • reduce
      reduce方法在指定一个函数将RDD中任何类型的值“规约”为一个值
      给定一组数字求和,reduce方法制定一个函数来接收两个参数,然后对它们求和返回一个新值,新值再作为其中一个参数继续传递给该函数,另一个参数为下一个数字,最终得到一个数字
    • take
      从RDD中读取一定数量的值
    缓存

    默认情况下,仅对内存中的数据进行缓存和持久化

    words.cache()

    通过pipe方法调用系统命令操作RDD

    将每个数据分区交给指定的外部进程来计算得到结果RDD,每个输入分区的所有元素被当做另一个外部进程的标准输入,输入元素由换行符分隔

    • mapPartitions
      map是mapPartitions基于行操作的一个别名,mapPartitions函数每次处理一个数据分区

    words.mapPartitions(part => IteratorInt).sum() // 2

    • foreachPartition
      mapPartition函数需要返回值,但是foreachPartition函数不需要
    words.foreachPartition { iter =>
      import java.io._
      import scala.util.Random
      val randomFileName = new Random().nextInt()
      val pw = new PrintWriter(new File(s"/tmp/random-file-${randomFileName}.txt"))
      while (iter.hasNext) {
          pw.write(iter.next())
      }
      pw.close()
    }
    
    • glom
      可以将数据集中每个分区都转换为数组
    spark.sparkContext.parallelize(Seq("Hello", "World"), 2).glom().collect()
    // Array(Array(Hello), Array(World))
    

    第十三章 高级RDD

    Key-Value RDD

    • 操作通常形如<some-operation>ByKey
      最简单的操作:

    words.map(word => (word.toLowerCase, 1))
    也可通过keyBy函数生成

    val keyword = words.keyBy(word => word.toLowerCase.toSeq(0).toString)
    // 输出的形式:[('s', 'Spark'), ('t', 'THE')]
    
    • lookup

    keyword.lookup("s")

    聚合操作
    • countByKey

    KVcharacters.countByKey()

    • groupByKey

    KVcharacters.groupByKey().map(row => (row._1, row._2.reduce(addFunc))).collect()

    会把所以key放到一起进行计算,可能出现倾斜

    • reduceByKey
      reduce发生在每个分组,不需要将所有内容放在内存,此操作不会导致shuffle过程

    KVcharacters.reduceByKey(addFunc).collect()

    • groupByKey和reduceByKey
      https://www.jianshu.com/p/0c6705724cff
      当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey
    • aggregateBykey
      基于key聚合,不基于分区聚合
    • combineByKey
      可以指定聚合函数和合并函数
    val valToCombiner = (value:Int) => List(value)
    val mergeValuesFunc = (vals:List[Int], valToAppend:Int) => valToAppend :: vals
    val mergeCombinerFunc = (vals1:List[Int], vals2:List[Int]) => vals1 ::: vals2
    // now we define these as function variables
    val outputPartitions = 6
    KVcharacters
      .combineByKey(
        valToCombiner,
        mergeValuesFunc,
        mergeCombinerFunc,
        outputPartitions)
      .collect()
    
    • foldByKey

    KVcharacters.foldByKey(0)(addFunc).collect() //0为加法,1为乘法

    连接

    co group

    可以将三个key-value RDD一起分组,key在一边,所有value在另一边

    import scala.util.Random
    val distinctChars = words.flatMap(word => word.toLowerCase.toSeq).distinct
    val charRDD = distinctChars.map(c => (c, new Random().nextDouble()))
    val charRDD2 = distinctChars.map(c => (c, new Random().nextDouble()))
    val charRDD3 = distinctChars.map(c => (c, new Random().nextDouble()))
    charRDD.cogroup(charRDD2, charRDD3).take(5)
    
    zip

    zip把两个RDDyuansu匹配在一起,要求两个RDD元素个数相同,分区数也相同,生成一个PairRDD

    控制分区

    使用RDD可以控制数据在整个集群上的物理分布

    自定义分区
    • 自定义分区是使用RDD的主要原因之一,结构化API不支持自定义数据分区
    • 自定义分区的唯一目标是将数据均匀地分布到整个集群中
    import org.apache.spark.Partitioner
    class DomainPartitioner extends Partitioner {
     def numPartitions = 3
     def getPartition(key: Any): Int = {
       val customerId = key.asInstanceOf[Double].toInt
       if (customerId == 17850.0 || customerId == 12583.0) {
         return 0  //把这两个数据量太大的用户放到一个分区里
       } else {
         return new java.util.Random().nextInt(2) + 1
       }
     }
    }
    
    keyedRDD
      .partitionBy(new DomainPartitioner).map(_._1).glom().map(_.toSet.toSeq.length)
      .take(5)
    

    第十四章 分布式共享变量

    • 第二种低级API是分布式共享变量,有两种:广播变量和累加器
    • 累加器将所有任务中的数据累加到一个共享结果中
    • 广播变量允许你在所有工作节点上保存一个共享值

    广播变量

    • 广播变量是共享的、不可修改的变量,它们缓存在集群中的每个节点上,不需要每个任务都反复序列化


      广播变量
    val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200,
                               "Big" -> -300, "Simple" -> 100)
    val suppBroadcast = spark.sparkContext.broadcast(supplementalData)
    

    累加器

    • 累加器可以将转换操作更新的值以高效和容错的方式传输到驱动节点
    • 累加器仅支持由满足交换律和结合律的操作进行累加的变量,因此对累加器的操作可以被高效并行
    • 对于仅发生在动作操作内执行的累加器更新,Spark保证每个任务对累加器的更新只发生一次,重新启动的任务不会再次更新该值
    • 在转换操作中,累加器可能由于重新执行发生多次变化


      累加器

      一个统计例子

    case class Flight(DEST_COUNTRY_NAME: String,
                      ORIGIN_COUNTRY_NAME: String, count: BigInt)
    val flights = spark.read
      .parquet("/data/flight-data/parquet/2010-summary.parquet")
      .as[Flight]
    
    import org.apache.spark.util.LongAccumulator
    val accUnnamed = new LongAccumulator
    val acc = spark.sparkContext.register(accUnnamed)
    
    val accChina = new LongAccumulator
    val accChina2 = spark.sparkContext.longAccumulator("China")
    spark.sparkContext.register(accChina, "China")
    
    def accChinaFunc(flight_row: Flight) = {
      val destination = flight_row.DEST_COUNTRY_NAME
      val origin = flight_row.ORIGIN_COUNTRY_NAME
      if (destination == "China") {
        accChina.add(flight_row.count.toLong)
      }
      if (origin == "China") {
        accChina.add(flight_row.count.toLong)
      }
    }
    
    flights.foreach(flight_row => accChinaFunc(flight_row))
    
    accChina.value // 953
    

    相关文章

      网友评论

          本文标题:Spark权威指南读书笔记(三):低级API

          本文链接:https://www.haomeiwen.com/subject/tyhkwktx.html