Spark中针对RDD的操作包括创建RDD、RDD转换操作和RDD行动操作。
Step1. 启动HDFS和Spark
mashu@mashu-Inspiron-5458:~$ cd /usr/local/hadoop
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./sbin/start-dfs.sh
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ cd /usr/local/spark
mashu@mashu-Inspiron-5458:/usr/local/spark$ ./bin/pyspark
Step2. 新建rdd文件夹以存储本案例资源
mashu@mashu-Inspiron-5458:~$ cd /usr/local/spark/python_code
mashu@mashu-Inspiron-5458:/usr/local/spark/python_code$ mkdir rdd
在rdd目录下新建一个word.txt文件,你可以在文件里面随便输入几行英文语句用来测试。
Step3-1. RDD创建 - 从本地创建
RDD可以通过两种方式创建:
- 第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统、HBase、Amazon S3等外部数据源中加载数据集。
- 第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
从本地创建方法:
>>> line = sc.textFile('file:///usr/local/spark/python_code/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
Step3-2. RDD创建 - 从HDFS创建
在HDFS用户根目录下创建rdd文件夹,将本地的word_rdd.txt上传到这里。
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -mkdir /user/mashu/rdd
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls .
Found 2 items
drwxr-xr-x - mashu supergroup 0 2018-08-11 15:09 rdd
-rw-r--r-- 1 mashu supergroup 41 2018-08-11 14:06 word.txt
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -put /usr/local/spark/python_code/rdd/word_rdd.txt ./rdd/
mashu@mashu-Inspiron-5458:/usr/local/hadoop$ ./bin/hdfs dfs -ls ./rdd
Found 1 items
-rw-r--r-- 1 mashu supergroup 65 2018-08-11 15:13 rdd/word_rdd.txt
在HDFS中创建RDD时,以下3个命令等价:
line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
line = sc.textFile('./rdd/word_rdd.txt')
line = sc.textFile('/user/mashu/rdd/word_rdd.txt')
>>> line = sc.textFile('hdfs://localhost:9000/user/mashu/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
>>> line = sc.textFile('./rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
>>> line = sc.textFile('/user/mashu/rdd/word_rdd.txt')
>>> line.first()
'I am learning Spark RDD'
Step3-3. RDD创建 - 通过并行集合(数组)创建RDD
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
>>> nums = [1,2,3,4,5]
>>> rdd = sc.parallelize(nums)
>>> rdd.collect()
[1, 2, 3, 4, 5]
Step4. RDD操作
RDD被创建好以后,在后续使用过程中一般会发生两种操作:
- 转换(Transformation): 基于现有的数据集创建一个新的数据集。
- 行动(Action):在数据集上进行运算,返回计算值。
每一次RDD转换操作都会产生不同的RDD,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
Step4-1. RDD操作 - 转换操作
一些常见的转换操作(Transformation API):
filter(func):筛选出满足函数func的元素,并返回一个新的数据集
map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
Step4-2. RDD操作 - 行动操作
一些常见的行动操作(Action API):
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前n个元素
reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行
Step4-3. RDD操作 - 实例
filter()
;collect()
>>> lines = sc.textFile('./rdd/word_rdd.txt')
>>> lines.filter(lambda line: 'Spark' in lines).count()
3
>>> lines.collect()
['I am learning Spark RDD', 'Spark is faster than Hadoop', 'I love Spark']
map()
;reduce()
找出文本文件中单行文本所包含的单词数量的最大值
>>> lines = sc.textFile('./rdd/word_rdd.txt')
>>> lines.map(lambda line:len(line.split(' '))).reduce(lambda a,b:)
5
p.s.对三目运算 a>b and a or b
的逻辑理解不了
Step5. 持久化
RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果我们需要多次调用不同的行动操作,都会触发一次从头开始的计算。特别对于迭代计算而言,代价很大。
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) #行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) #行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
上面代码执行过程中,前后共触发了两次从头到尾的计算。
实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化。具体操作如下:
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> rdd.cache() #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
ParallelCollectionRDD[24] at parallelize at PythonRDD.scala:175
>>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3
>>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
Step6. 分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区。
对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
- 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
- Apache Mesos:默认的分区数为8;
- Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值;
因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array,2) #设置两个分区
对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。
Step7. 打印元素
本地:
rdd.foreach(print)
或者rdd.map(print)
集群上:
采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法:
rdd.collect().foreach(print) #会抓取各个worker节点上的所有RDD元素,这能会导致内存溢出
rdd.take(100).foreach(print) #用于只需要打印RDD的部分元素时
网友评论