美文网首页
RDD编程API

RDD编程API

作者: 嗷老板 | 来源:发表于2018-07-24 09:01 被阅读0次

    一、RDD的创建

    1、由一个已经存在的Scala集合创建

    2、由外部存储系统的文件创建

      包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。

    3、已有的RDD经过算子转换生成新的RDD

    三、RDD编程API

    1.RDD 的算子分类

    • Transformation(转换):根据数据集创建一个新的数据集,计算后返回一个新RDD;例如:一个rdd进行map操作后生了一个新的rdd。
    • Action(动作):对rdd 结果计算后返回一个数值value 给驱动程序;例如:collect 算子将数据集的所有元素收集完成返回给驱动程序。

    2.Transformation

      RDD 中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark 更加有效率地运行。

    转换 含义
    map(func) 返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成
    filter(func) 返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成
    flatMap(func) 类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 (Int, Interator[T]) => Iterator[U]
    union(otherDataset) 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
    intersection(otherDataset) 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
    distinct([numTasks])) 对源 RDD 进行去重后返回一个新的 RDD
    groupByKey([numTasks]) 在一个(K,V)的 RDD 上调用,返回一个(K, Iterator[V])的 RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置
    sortByKey([ascending], [numTasks]) 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD
    sortBy(func,[ascending], [numTasks]) 与 sortByKey 类似,但是更灵活
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD
    cogroup(otherDataset, [numTasks]) 在 类 型 为 (K,V) 和 (K,W) 的 RDD 上 调 用 , 返 回 一 个 (K,(Iterable<V>,Iterable<W>))类型的 RDD
    coalesce(numPartitions) 减少 RDD 的分区数到指定值。
    repartition(numPartitions) 重新给 RDD 分区
    repartitionAndSortWithinPartitions(part itioner) 重新给 RDD 分区,并且每个分区内以记录的 key 排序

    3.Action

    动作 含义
    reduce(func) reduce 将 RDD 中元素前两个传给输入函数,产生一个新的 return 值, 新产生的 return 值与 RDD 中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回 RDD 的元素个数
    first() 返回 RDD 的第一个元素(类似于 take(1))
    take(n) 返回一个由数据集的前 n 个元素组成的数组
    takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
    saveAsTextFile(path) 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
    saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
    countByKey() 针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func
    foreachPartition(func) 在数据集的每一个分区上,运行函数func

    四、RDD 常用的算子操作

      启动 spark-shell进行测试:

        spark-shell--master spark://node1:7077
    

    练习1:map、filter
      map:对每一个元素进行操作
      filter:对每一个元素进行过滤

        //通过并行化生成rdd
        val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))
        //对rdd1 里的每一个元素乘 2 然后排序
        val rdd2 = rdd1.map(_ * 2).sortBy(x => x, true)
        //过滤出大于等于 5 的元素
        val rdd3 = rdd2.filter(_ >= 5)
        //将元素以数组的方式在客户端显示
        rdd3.collect
    

    练习 2:flatMap

        val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
        //将rdd1 里面的每一个元素先切分在压平
        val rdd2 = rdd1.flatMap(_.split(" ")) 
        rdd2.collect
    

    练习 3:交集、并集

    val rdd1 = sc.parallelize(List(5, 6, 4, 3))
    val rdd2 = sc.parallelize(List(1, 2, 3, 4))
    //求并集
    val rdd3 = rdd1.union(rdd2)
    //求交集
    val rdd4 = rdd1.intersection(rdd2)
    //去重
    rdd3.distinct.collect 
    rdd4.collect
    

    练习 4:join、groupByKey
      join:将集合中相同的键提取出来,对应的值组成一个元组
      join:将集合中相同的键提取出来,对应的值封装到compactBuffer中

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1),("shuke", 2)))
    //求 join
    val rdd3 = rdd1.join(rdd2) 
    rdd3.collect
    //求并集
    val rdd4 = rdd1 union rdd2 
    rdd4.collect
    //按key 进行分组
    val rdd5=rdd4.groupByKey 
    rdd5.collect
    
    join groupByKey

    练习 5:cogroup

    val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("jim", 2)))
    //cogroup
    val rdd3 = rdd1.cogroup(rdd2)
    //注意cogroup 与groupByKey的区别
    rdd3.collect
    
    cogroup

    练习 6:reduce

    val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5))
    //reduce 聚合
    val rdd2 = rdd1.reduce(_ + _) 
    rdd2.collect
    

    练习 7:reduceByKey、sortByKey

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2),("shuke", 1)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3),("shuke", 2), ("kitty", 5))) val rdd3 = rdd1.union(rdd2)
    //按key 进行聚合
    val rdd4 = rdd3.reduceByKey(_ + _) 
    rdd4.collect
    //按 value 的降序排序
    val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t=> (t._2, t._1)) 
    rdd5.collect
    

    练习 8:repartition、coalesce

    val rdd1 = sc.parallelize(1 to 10,3)
    //利用repartition 改变rdd1 分区数
    //减少分区
    rdd1.repartition(2).partitions.size
    //增加分区
    rdd1.repartition(4).partitions.size
    //利用coalesce 改变rdd1 分区数
    //减少分区
    rdd1.coalesce(2).partitions.size
    

      注意:repartition可以增加和减少rdd中的分区数,coalesce只能减少 rdd分区数,增加rdd分区数不会生效。

    相关文章

      网友评论

          本文标题:RDD编程API

          本文链接:https://www.haomeiwen.com/subject/ncilmftx.html