美文网首页
Spark core RDD API

Spark core RDD API

作者: 王龙江_3c83 | 来源:发表于2019-03-19 13:45 被阅读0次

    1. RDD 的概述

    1.1 RDD 的优势

    • 高效容错
    • 可以控制数据的分区来优化计算性能
    • 并行处理
    • 提供了丰富的操作数据的 API
    • 可以显式的将任何类型的中间结果存储在内存中

    1.2 RDD 的方法

    函数 功能
    compute(split: Partition, context: TaskContext): Iterator[T] 一个计算每个分区数据的函数
    getPartitions: Array[Partition] 一个分区列表,用于并行计算
    getDependencies: Seq[Dependency[_]] = deps 一个依赖列表,这个rdd依赖的父rdd是哪些(在计算的时候可以通过这个依赖来容错)
    getPreferredLocations(split: Partition): Seq[String] = Nil 分区数据的优先存储地址。
    val partitioner: Option[Partitioner] = None RDD 是如何分区的,比如某个rdd是通过hash partitioner得到的

    1.3 常见类型的 RDD

    1.3.1 MapPartitionsRDD

    • 将自定义的函数应用到父亲 RDD 一个分区到输出数据中去。
    • 有且仅有一个窄依赖,即只依赖一个父亲 RDD。
    • 分区器:可以选择是否保留父亲 RDD 的分区器。
    • 计算分区列表:继承父亲 RDD 的分区列表。

    1.3.2 ParallelCollectionRDD

    1.3.3 ShuffleRDD

    2. RDD 的创建

    函数 功能
    sc.textFile(path) 从文件系统中(本地文件系统或 HDFS ),得到行数据的 HadoopRDD。
    sc.sequenceFile[KeyClass,ValueClass](path) 加载 HDFS sequenceFile 文件。
    sc.parallelize(Seq(1,2,3)) 从内存中已经存在的序列列表(Seq、List、Array)中,得到 ParallelCollectionRDD
    sc.makeRDD(Seq(1,2)) parallelize 别名
    sc.range(start,end) 创建区间为 [start,end) 的 MapPartitionsRDD

    3. 依赖

    3.1 窄依赖

    父亲 RDD 的一个分区数据只能被子 RDD 的一个分区消费,子 RDD 的一个分区可以对应父 RDD 的多个分区。

    • OneToOneDependency:map、filter
    • RangeDependency:union
    • 本地性,一个父分区计算完,子分区计算。
    • 失败后计算失败分区。
    • 和父亲 RDD 的 Partitioner 相同,并且关联属性是分区键,则不发生 shuffle。

    3.2 宽依赖

    父亲 RDD 的一个分区数据被子 RDD 的多个分区消费。

    • ShuffleDependency:reduceByKey

    4. Partitioner

    给这个RDD数据进行分区的分区器。

    • 从存储系统创建的 RDD 不需要分区,HDFS 有多少数据块,就有多少分区。
    • 非 key-value 结构没必要分区。
    • key-value 结构需要分区,分区对象为 key。

    4.1 实现

    • KV 算子,作为参数传入
    • 调用 RDD partitionBy 函数
    • 调用 RDD coalesce 函数
    • 调用 RDD repartition 函数

    4.2 分区优化

    • 对 RDD 预分区能提高性能。
    • 是否保留父 RDD 的分区器,mapValue、flatMapValue 方法保留父 RDD 的分区器。

    4.3 对比

    HashPartitioner RangePartitioner
    将可以排序的 key 分到几个大概相等的范围分区内的一个分区中。
    不支持 Array 类型的 key。 不支持不能排序的 Key。
    可能导致数据倾斜。 可以解决分区数据倾斜的问题。
    分区后的数据不会排序。 分区后分区之间的数据是有序的。

    5. 单类型 RDD 操作 API

    5.1 Transformations

    方法 功能
    map(func) 接收函数,将函数应用到 RDD 中到每一个元素,返回新的 RDD。
    mapPartition(func) 类似于 map,但独立地在RDD的每一个分片上运行,因此在类型为 T 的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是 (Int, Interator[T]) => Iterator[U]
    flatMap 对每个输入元素,可以输出多个元素。
    sample
    filter 接收函数,返回只包含满足 filter 函数的元素的新 RDD。
    distinct 去重

    5.2 Actions

    方法 功能
    collect 返回 RDD 的所有元素。
    count 计数。
    countByValue 返回一个 Map,表示唯一元素出现的次数。
    take 返回几个元素。
    top 返回前几个元素。
    takeOrdered 返回基于提供的排序算法前几个元素。
    takeSample(withReplacement,num,[seed]) 取样
    reduce 合并 RDD 中的元素
    fold 与 reduce 相似,提供 zero value,rdd.flod(0)(+)
    aggregate()
    foreach() 遍历 rdd 中的每个元素。

    6. key-value 类型 RDD 操作 API

    6.1 key-value 类型 RDD 生成方法

    • 自定义
    • map 函数生成。
    • keyBy 函数生成。
    • groupBy 函数生成,在 keyBy 的基础上,将 key 相同的元素进行聚合,基于 groupByKey 实现,相当于 keyBy+groupByKey。

    6.2 KeyValue 对 RDD

    函数 功能
    combinerByKey
    aggregateByKey aggregateByKey((0,0))(mergeValue,mergeCombiner)可以实现 value 值、和词频统计功能。
    reduceByKey createCombiner 不对数据进行任何处理,mergeValue 和 mergeCombiner 调用传入的 reduce 函数。
    distinct 基于 reduceByKey 实现,键值对都相同,则去重。reduce=((x,y)=>x)
    foldByKey(n) createCombiner=mergeValue(n,value)
    groupByKey createCombiner:元素转 ArrayBuffer 集合;mergeValue:将新元素添加进集合;mergeCombiner:集合合并。
    sortByKey
    sortBy

    7. 二元操作 API

    方法 功能
    union 并集
    intersection 交集
    subtract 差集
    方法 功能
    persist(StorageLevel) 给 RDD 的 StorageLevel成员变量(默认为 None)赋值,存储级别:MEMORY_ONLY(默认)、DISK_ONLY、MEMORY_AND_DISK、OFF_HEAP。
    cache 分布式缓存,等于 cache(StorageLevel.MEMORY_ONLY)。
    unpersist 移除持久化数据。
    ietrator 获得当前 RDD 的输出。
    localCheckpoint() 本地磁盘文件,等于 cache(StorageLevel.MEMORY_AND_DISK)。
    checkpoint() HDFS 文件系统。

    8.集成 Spark SQL

    8.1 集成步骤

    8.1.1 SparkSession & SQLContext

        val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
        val sc = new SparkContext(conf)
        val spark = SparkSession.builder.config(conf).getOrCreate()
        import spark.implicits._
    
        val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    

    8.1.2 将外部数据映射为 RDD

        val fileRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
        // 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
        val rowRDD: RDD[Row] = fileRDD.map(line => {
          val fields = line.split(",")
          Row(fields(0), fields(1).trim.toInt)
        })
    

    8.1.3 使用 class 或 StructType 封装数据

    val peopleRDD = sc.textFile("/Users/dreamaker/Downloads/data/people.txt")
          .map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
        // 将RDD 转换成 DataFrames
        val df = peopleRDD.toDF
    
    val structType: StructType = StructType(
          //字段名,字段类型,是否可以为空
          StructField("name", StringType, true) ::
            StructField("age", IntegerType, true) :: Nil
        )
        /**
          * rows: java.util.List[Row],
          * schema: StructType
          * */
        val df: DataFrame = sqlContext.createDataFrame(rowRDD,structType)
    

    8.1.4 使用 DataFrame 创建临时表

        df.createOrReplaceTempView("people")
        sqlContext.sql("select * from people").show()
    

    8.1.5 将 df 写入外部数据源

    df.write.format("parquet").save("/Users/dreamaker/Downloads/data/people.parquet")
    

    参考资料

    相关文章

      网友评论

          本文标题:Spark core RDD API

          本文链接:https://www.haomeiwen.com/subject/ixpgmqtx.html