美文网首页Spark学习笔记
3.Spark学习(Python版本):Spark RDD编程基

3.Spark学习(Python版本):Spark RDD编程基

作者: 马淑 | 来源:发表于2018-08-11 14:26 被阅读51次

    Spark中针对RDD的操作包括创建RDD、RDD转换操作和RDD行动操作。

    Step1. 启动HDFS和Spark
    mashu@mashu-Inspiron-5458:~$ cd /usr/local/hadoop
    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./sbin/start-dfs.sh
    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ cd /usr/local/spark
    mashu@mashu-Inspiron-5458:/usr/local/spark$ ./bin/pyspark
    
    Step2. 新建rdd文件夹以存储本案例资源
    mashu@mashu-Inspiron-5458:~$ cd /usr/local/spark/python_code
    mashu@mashu-Inspiron-5458:/usr/local/spark/python_code$ mkdir rdd
    

    在rdd目录下新建一个word.txt文件,你可以在文件里面随便输入几行英文语句用来测试。

    Step3-1. RDD创建 - 从本地创建

    RDD可以通过两种方式创建:

    • 第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统、HBase、Amazon S3等外部数据源中加载数据集。
    • 第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

    从本地创建方法:

    >>> line = sc.textFile('file:///usr/local/spark/python_code/rdd/word_rdd.txt')
    >>> line.first()
    'I am learning Spark RDD' 
    
    Step3-2. RDD创建 - 从HDFS创建

    在HDFS用户根目录下创建rdd文件夹,将本地的word_rdd.txt上传到这里。

    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -mkdir /user/mashu/rdd
    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls .
    Found 2 items
    drwxr-xr-x   - mashu supergroup          0 2018-08-11 15:09 rdd
    -rw-r--r--   1 mashu supergroup         41 2018-08-11 14:06 word.txt
    
    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -put /usr/local/spark/python_code/rdd/word_rdd.txt ./rdd/
    mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls ./rdd
    Found 1 items
    -rw-r--r--   1 mashu supergroup         65 2018-08-11 15:13 rdd/word_rdd.txt
    

    在HDFS中创建RDD时,以下3个命令等价:
    line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
    line = sc.textFile('./rdd/word_rdd.txt')
    line = sc.textFile('/user/mashu/rdd/word_rdd.txt')

    >>> line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
    >>> line.first()
    'I am learning Spark RDD'
    >>> line = sc.textFile('./rdd/word_rdd.txt')
    >>> line.first()
    'I am learning Spark RDD'
    >>> line = sc.textFile('/user/mashu/rdd/word_rdd.txt')
    >>> line.first()
    'I am learning Spark RDD'
    
    Step3-3. RDD创建 - 通过并行集合(数组)创建RDD

    可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

    >>> nums = [1,2,3,4,5]
    >>> rdd = sc.parallelize(nums)
    >>> rdd.collect()
    [1, 2, 3, 4, 5]
    
    Step4. RDD操作

    RDD被创建好以后,在后续使用过程中一般会发生两种操作:

    • 转换(Transformation): 基于现有的数据集创建一个新的数据集。
    • 行动(Action):在数据集上进行运算,返回计算值。

    每一次RDD转换操作都会产生不同的RDD,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

    Step4-1. RDD操作 - 转换操作
    一些常见的转换操作(Transformation API):
    • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
    • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
    • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
    • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
    Step4-2. RDD操作 - 行动操作
    一些常见的行动操作(Action API):
    • count() 返回数据集中的元素个数
    • collect() 以数组的形式返回数据集中的所有元素
    • first() 返回数据集中的第一个元素
    • take(n) 以数组的形式返回数据集中的前n个元素
    • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    • foreach(func) 将数据集中的每个元素传递到函数func中运行
    Step4-3. RDD操作 - 实例

    filter();collect()

    >>> lines = sc.textFile('./rdd/word_rdd.txt')
    >>> lines.filter(lambda line: 'Spark' in lines).count()
    3
    >>> lines.collect()
    ['I am learning Spark RDD', 'Spark is faster than Hadoop', 'I love Spark']
    

    map();reduce()

    找出文本文件中单行文本所包含的单词数量的最大值

    >>> lines = sc.textFile('./rdd/word_rdd.txt')
    >>> lines.map(lambda line:len(line.split(' '))).reduce(lambda a,b:)
    5
    

    p.s.对三目运算 a>b and a or b的逻辑理解不了

    Step5. 持久化

    RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果我们需要多次调用不同的行动操作,都会触发一次从头开始的计算。特别对于迭代计算而言,代价很大。

    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
    3
    >>> print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
    Hadoop,Spark,Hive
    

    上面代码执行过程中,前后共触发了两次从头到尾的计算。
    实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化。具体操作如下:

    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> rdd.cache()  #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
    ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:175
    >>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
    3
    >>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
    Hadoop,Spark,Hive
    
    Step6. 分区

    RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区。
    对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

    • 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
    • Apache Mesos:默认的分区数为8;
    • Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;

    因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:

    >>> array = [1,2,3,4,5]
    >>> rdd = sc.parallelize(array,2) #设置两个分区
    

    对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
    如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。

    Step7. 打印元素
    本地:

    rdd.foreach(print)或者rdd.map(print)

    集群上:

    采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法:
    rdd.collect().foreach(print) #会抓取各个worker节点上的所有RDD元素,这能会导致内存溢出
    rdd.take(100).foreach(print) #用于只需要打印RDD的部分元素时

    相关文章

      网友评论

        本文标题:3.Spark学习(Python版本):Spark RDD编程基

        本文链接:https://www.haomeiwen.com/subject/byrdbftx.html