2020/06/30 -
2020/06/30 21:30形成初稿
引言
本章节主要介绍关于RDD两种基础操作。对于一般的spark数据分析应用而言,大致的流程如下:
- 创建RDD(从外界导入数据);
- 对RDD进行一系列的操作;
- 最后将RDD的结果保存;
本章将按照《Learning Spark》(2015年第一版)中的脉络来梳理这部分内容。
实际上,对于结构化的数据来说,Spark支持利用DataFrame的形式来处理数据,但是这本书并没有提到(出版也比较早了),主要还是利用RDD来进行。如果是非结构化的数据,还是要使用RDD的数据形式来处理更好;而且现在使用这些东西都是使用sparksession来操作了。。。
RDD是Spark的核心数据,通过对RDD的操作来完整整个数据操作流程;RDD具备两种操作,并通过惰性计算来进行区分。为了提高应用的速度,可以将某些RDD进行持久化。这也是为什么Spark能够快速运算的一个原因,就是可以将数据存储在内存中多次运算。
(后期要记录一下Spark的优势)
下面的内容主要从一下几个部分介绍
- RDD是什么
- RDD支持的操作
- transformations操作简介
- actions操作简介
- 常见的transformations和actions操作
- 持久化,cache
- 小节
1. RDD是什么
RDD是spark中的一个核心概念,全称是“the resilient distributed dataset”。它代表着数据在多台机器上的分布式存储形式,在当前我的理解中,和hadoop中处理数据的方式是一样的,都是多台机器(进程)同时阅读文件的不同块(以hdfs为文件系统),然后每个机器分别持有了大文件的某小块。
创建RDD一般分为两种方式:从外界加载数据以及从driver中将集合类数据分布为RDD。
创建RDD最简单的方法,从外界加载数据,就是one_rdd = sc.textFile("xxx.file")
。
另外一种方法,通过并行分布式化,one_rdd = sc.parallelize(['pandas', 'i like pandas'])
2. RDD支持的操作
RDD支持两种操作,一种是transformations,另一种是actions。看到有一些网站中对这两个操作的翻译是转化和行动。我在这篇文章中还是按照英文来说明具体的信息。transformations是指在一个已有的RDD上生成新的RDD,这两个RDD之间只有这个操作作为依赖关系;而actions是指对数据进行一些计算返回一个结果,可以将其返回driver或者保存到外界文件系统。
transformations和actions是两种完全不以言的操作,他们的主要区别在与操作是否马上执行。actions操作是马上执行的,但是transformations操作只有在生成的最终的RDD上执行actions的时候才会执行。这种操作被成为lazy evaluation,惰性计算。
(穿插一个小问题,前面sc.textFile在执行之后并不是真的进行,他也会等actions执行的时候才执行,这也是有时候为什么两次count时间不一样)。
简单总结就是,transformations返回新的RDD,actions执行操作,并返回结果。
在使用textFile读取文件,然后执行第一个actions之后,这部分rdd会被加载到内存中,按说也是持久化到内存了,但是为什么4040端口上的storage中不会显示,只有执行了persist()之后才会显示,我不是很理解。对于利用transformations生成的RDD,如果不使用persist(),那么每次你对这个RDD进行actions操作,他都会完全重新从原始的RDD上计算全部的tranformations之后来进行。
关于这些内容,后面还会具体展开说明。
3. transformations操作简介
transformations是惰性操作,惰性操作只有在执行一次action操作之后才会实现;很多transformations操作都是按照元素来进行的,但不是绝对的。transformations操作并不在原始的RDD上进行修改,而是生成一个指针,指向新的RDD,按照这种方式,生成一个DAG(这部分信息可以在4040端口上看到)。下面来简单列举一些操作。
-
filter(func)
python中传递函数过去,只要使用可以序列化的类型即可;从文章的示例来看,一般函数的返回值都是布尔量,是否正确,只有正确的被留下。 -
union(rdd)
union是操作另一个rdd的函数,通过与另外一个RDD相互作用,生成新的RDD。这个应该没有数据是否结构化的限制,只要都是RDD就可以拼接在一起。
4. actions操作简介
actions将强制transformations操作执行。常见的操作有,take(),collect();collect()是将整个RDD的数据都返回,一般大数据的时候不要这样操作,我记得我又一次就这样把程序搞崩了。
5. 常见的transformations和actions操作
transformations
transformations可以分为两种,一种是面向元素级别的,也就是对每个元素都进行操作;一种是面向RDD的,也就是参数也是RDD。常见的操作map,正如他名字所表明,map就是对所有的元素都进行操作;我之前的时候利用pandas的map进行过滤,他返回的是一个布尔值的列表,然后我在spark上测试了一些也是一个布尔型列表,那么如此说来,他就是创建了一个新的RDD,RDD的数据类型是完全你来定义的,可以不跟原始的rdd数据类型一致。filter,前面已经提到过,按照传递进去的函数结果是否为正来返回结果。flapmap会将返回的列表类型的数据展开,这样数量上来看就更多了。
两个RDD的操作,这部分操作大致包括组合,交叉等等。具体可以根据业务逻辑来学习。
元素级别的RDD 两个RDD之间的操作
actions
actions的第一个操作是reduce,但是单纯看这个书上的内容,我根本看不懂,虽然可以自己说服自己好像是这么回事,但是再提到fold和agg就完全看不懂了。reduce对RDD所有的元素执行一个操作,并将这些操作全部都加在一起,或者怎么样吧;而fold是提供了一个初始值。但是agg就有些看不懂了。
首先来说明reduce就是将操作在RDD上所有的内容都执行一遍,最好理解的方式就是累加,他的参数是一个函数,这个函数有两个参数,分别是x和y。我不知道为什么书上没有说明这个东西。我是在文章[1]中看到的解释。x实际上是这个函数之前运行的返回值,而y是所有的rdd的值。在fold的情况下,就是先给x进行一个赋值。而reduce和fold都只能返回与元素数据类型一样的数据,这个时候就需要aggregate来完成后续的操作了,它可以返回与RDD数据类型不一样的东西,比如元组,具体的解释也可以看文章[1]。
6. 持久化,cache
持久化我来翻译一下这句话。这里的第一个重要的观点是,假设一个RDD是你从另外一个RDD上创建来的,也就是说他们之间存在依赖,这个时候,如果你不对这个新的RDD进行持久化,那么你每次进行actions操作的时候,系统都会将这些操作重新计算。然后如果你进行了持久化,那么在进行第一次actions计算之后,spark会将这部分内容存储在内存中。而且要进行的操作是count这种,take(10)只会持久化一部分。
这些内容是持久化的核心部分。
我当时测试这部分内容的时候,就是通过在textFile生成的RDD进行操作来测试。
测试过程
对于如果执行了持久化操作的RDD,是可以在4040端口的storage中看到具体的对象的。持久化的方式也有多种,包括硬盘以及内存等,这里暂时不涉及。
另外一个比较重要的知识是,如果缓存了非常多的数据之后,当内存无法装下的时候,spark自己会将一些比较老的数据进行老化。
2020/07/05 -
在学习第四章内容的时候,我有对这个cache产生了疑惑,我已经知道如果是从别的RDD上创建RDD是重新计算的,但是textFile会自动生成缓存吗?从运行时间上来看,如果你对一个从textFile获取的RDD进行操作(中间没有别的操作),第一次count,会明显比第二次count的时间长。这好像意味着textFile就是把数据读入了内存。但是storage中并没有看到这部分内容,只有执行了persist之后才能出现,所以这里我很疑惑。而且从前面的理解来说,这种textFile也只是惰性操作,所以正常来理解,他的确实应该读两次。我个人感觉,可能这种比较特殊,就是读入内存中的过程。感觉如果想弄明白,得仔细地去看他的那个DAG还有各种时间线。
所以这里的问题是,如果属于我要多次操作的东西,那么textFile之后需要缓存吗?我的建议是,这里不是很明确,最好还是缓存吧。特别是那种使用多次的东西。第四章的例子中,就在读取了文件之后,就缓存了,他就是一个要多次使用这个数据的例子。
文章[2]中第二个回答比较好,就是说,如果你产生了这种RDD形式的分支,那么这个是应该缓存。
我还是感觉非常疑惑,最大的问题就是在这个textFile上,为啥他没有显示process_local呢?
而且,我甚至发现一个这样的事,就是我利用filter过滤已经读取的rdd(没有任何过滤操作,就是直接返回),结果这个RDD执行操作的,比原来的还快,而且他们都是cache的。。。这我就更疑惑了。
7. 小节
关于spark的RDD操作大致就是这些,我觉得那些基础的操作平时用的还是说比较少,大部分时候还是以来的与结构化的数据,所以很多时候都用不到;我印象中,我好像都没有用到,我倒是用到了pair类型的rdd操作。他们的操作应该是有重叠的。
这里比较关键的,也是对性能提升比较高的,是将数据持久化,我觉得这部分后续应该是同时来学习网页上的实时信息,比如storage还有dag那些东西,这样能够更好的提高性能,但是这部分我还没有看。
参考文章
[1]Spark的fold()和aggregate()函数
[2](Why) do we need to call cache or persist on a RDD
·
网友评论