Spar05 RDD 转换算子

作者: 山高月更阔 | 来源:发表于2020-05-23 19:34 被阅读0次

    RDD 简介

    rdd 是 spark 对数据的核心抽象,全称弹性分布式数据集( Resilient Distributed Dataset 简称 RDD )
    spark 中的 RDD 是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点。RDD 可以包含 Python、Java、Scala 中任意类型的数据对象,甚至可以包含用户自定义对象。
    在 spark 中对 RDD 的操作有创建 RDD、转化已有的 RDD 以及调用 RDD 操作进行求值。

    创建 RDD

    parallelize 方法

    python 中的 parallelize() 方法

    >>> sc.parallelize(['a','b','c'])
    ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
    

    Scala 中的 parallelize() 方法

    >>> sc.parallelize(['a','b','c'])
    ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195
    

    textFile 方法

    python textFile() 方法

    sc.textFile('/path/file')
    

    Scala textFile() 方法

    sc.textFile('/path/file')
    

    文件路劲说明
    hadoop 的配置 $HADOOP_HOME/etc/hadoop/core-site.xml 中配置

     <property>
            <name>fs.defaultFS</name>
            <value>hdfs://localhost:9000</value>
        </property>
    

    如果有上述配置textFile 默认从hdfs中读取文件
    如果需要指定
    指定从本地文件中读取

    sc.textFile('file:///path/file')
    

    指定从hdfs文件中读取

    sc.textFile('hdfs:/path/file')
    

    db中读取数据创建 rdd

    可以从db读取数据组装成 list 在通过parallelize 方式创建

    有了 RDD 必然需要多 RDD 中的数据进行操作 对 RDD 操作的函数 也称为算子

    map 算子

    将算子应用于每一个数据并返回一个新的RDD
    python

    >>> rdd = sc.parallelize([1,2,3])
    >>> rdd1=rdd.map(lambda x:x+1)
    >>> rdd1.collect()
    [2, 3, 4]  
    

    创建一个数字列表RDD map将每个数字加1操作
    Scala

    scala> val rdd = sc.parallelize(List(1,2,3))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> val rdd1 = rdd.map(_+1)
    rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
    scala> rdd1.collect()
    res0: Array[Int] = Array(2, 3, 4)   
    

    从python和Scala的方式来看 语法上略有差异 本质操作还是一样的

    flatMap 算子

    flatMap 也是作用于每个元素并返回值,将新值组装一个新的 rdd 与 map 不同的是 map返回一个元素 flatMap返回一个列表 flatMap通常用来切分单词
    python

    >>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
    >>> rdd1 = rdd.flatMap(lambda line : line.split(' '))
    >>> rdd1.collect()
    ['张三', '李四', '王五', '李丽']
    

    假如用map做同样操作

    >>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
    >>> rdd1 = rdd.map(lambda line : line.split(' '))
    >>> rdd1.collect()
    [['张三', '李四'], ['王五', '李丽']]
    

    通过比较结果很清晰flatMap与map不同。 map返回元素作为rdd 的一个元素 即使是list 也是 rdd 的一个元素 只是这个元素是list flatMap返回list中每个元素都是rdd中的一个元素

    Scala

    scala> val rdd = sc.parallelize(List("张三 李四","王五 李丽"))
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
    
    scala> val rdd1 = rdd.flatMap(_.split(" "))
    rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:25
    
    scala> rdd1.collect()
    res1: Array[String] = Array(张三, 李四, 王五, 李丽)
    

    distinct 算子

    去重
    python

    >>> rdd = sc.parallelize(['a','a','b','c'])
    >>> rdd1 = rdd.distinct()
    >>> rdd1.collect()
    ['b', 'c', 'a'] 
    

    Scala

    scala> val rdd = sc.parallelize(List("a","a","b","c"))
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
    
    scala> val rdd1 = rdd.distinct()
    rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at distinct at <console>:25
    scala> rdd1.collect()
    res2: Array[String] = Array(a, b, c)
    

    sample 算子

    从原 rdd 中抽样 有三个参数

    • withReplacement: true表示有放回的采样,false表示无放回采样
    • fraction:期望样本的大小作为RDD大小的一部分
      当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
      当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
    • seed:随机数生成器的种子

    如: 元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5
    python

    >>> rdd = sc.parallelize(['a','a','b','c'])
    >>> rdd.sample(False,0.5).collect()
    ['b', 'c']
    >>> rdd.sample(False,0.5).collect()
    ['a', 'c']
    >>> rdd.sample(False,0.5).collect()
    ['a', 'b', 'c']
    

    如: 元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2
    python

    >>> rdd = sc.parallelize(['a','a','b','c'])
    >>> rdd.sample(True,2).collect()
    ['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']
    >>> rdd.sample(True,2).collect()
    ['a', 'a', 'a', 'a', 'b', 'c', 'c']
    >>> rdd.sample(True,2).collect()
    ['a', 'a', 'a', 'b', 'c', 'c', 'c']
    

    以上算子都是对单个 rdd 操作 下面介绍两个 rdd 之间操作的算子

    union 算子

    生成一个包含两个rdd中所有元素的rdd
    python

    >>> rdd = sc.parallelize(['a','b','c'])
    >>> rdd1 = sc.parallelize(['c','d','d'])
    >>> rdd.union(rdd1).collect()
    ['a', 'b', 'c', 'c', 'd', 'd']
    

    intersection 算子

    求两个rdd的共同元素

    >>> rdd = sc.parallelize(['a','b','c'])
    >>> rdd1 = sc.parallelize(['c','d','d'])
    >>> rdd.intersection(rdd1).collect()
    ['c']
    

    这个返回结果会去重
    比如:

    >>> rdd = sc.parallelize(['a','b','c','c'])
    >>> rdd1 = sc.parallelize(['c','d','d'])
    >>> rdd.intersection(rdd1).collect()
    ['c']
    

    rdd 中有两个'c' 通过intersection后仍然只有一个'c'

    subtract 算子

    移除一个 RDD中的内容

    >>> rdd = sc.parallelize(['a','b','c','c','a'])
    >>> rdd1 = sc.parallelize(['c','d','d'])
    >>> rdd.subtract(rdd1).collect()
    ['a', 'a', 'b']
    

    次例中表示从 rdd中移除含rdd1中元素

    cartesian 算子

    计算两个算子的笛卡尔积

    >>> rdd = sc.parallelize(['a','b','c'])
    >>> rdd1 = sc.parallelize(['1','2','3'])
    >>> rdd.cartesian(rdd1).collect()
    [('a', '1'), ('a', '2'), ('a', '3'), ('b', '1'), ('b', '2'), ('b', '3'), ('c', '1'), ('c', '2'), ('c', '3')]
    

    在计算相似度时特别有用 比如rdd是用户集 rdd1上商品集 需要给用户推荐商品 需计算用户与每个商品的推荐指数 就会用到用户集和商品集的笛卡尔乘积。只是特别注意,求大规模的rdd笛卡尔积开销巨大

    以上介绍 rdd 基本算子 下一篇介绍 rdd 行动算子

    glom 算子

    按分区进行分组
    Python

    >>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    >>> rdd.glom().collect()
    [[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
    

    说明将数据分为4个分区 第一个分区有数据 1,2 。 第二个分区数据 3,4。第三个分区 5,6 。 第四个分区 7,8,9,10

    repartition 将 RDD 进行重新分区

    Pyhon

    >>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
    >>> rdd.glom().collect()
    [[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
    >>> rdd.repartition(2).glom().collect()
    [[1, 2, 5, 6, 7, 8, 9, 10], [3, 4]]
    

    repartition 会对RDD 数据进行重新组合比较消息性能,但在某些特定场合配置foreachPartition 使用。会对新能有极大提升

    conf = SparkConf().setMaster("local[*]").setAppName("My Demo")
    sc = SparkContext(conf=conf)
    rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd1 = rdd.repartition(2)
    def func(ite):
      for i, value in enumerate(ite):
        # 这里会在executor中执行 这里print 只是个例子 说明怎么使用ite
        print(value)
    
    rdd1.foreachPartition(func)
    

    相关文章

      网友评论

        本文标题:Spar05 RDD 转换算子

        本文链接:https://www.haomeiwen.com/subject/rigsohtx.html