# 1 RDD 常用操作
*transformations*, which create a new dataset from an existing one
RDDA ---transformations-->RDDB
RDDB = RDDA.map(...)
【面试重点】lazy——transformations 不会立刻计算内容,只会记录关系,只有遇到action才会进行计算。
操作包含:map/filter/gtoup by/distinct
*actions*, which return a value to the driver program after running a computation on the dataset.
操作包含:count,reduce、collect/^
【特点】
* transformation are lazy,nothing actually happens until an action is called;
* action triggers the computation;
* action returns values to driver or writes data to external storage;
# 2 Transformation算子
## 1)map
map(func):将func函数作用到每个数据集上面,生成一个新的分布式数据集返回
```
def my_map2():
data = ['cat','dog','tiger','lion']
rdda = sc.parallelize(data)
rddb = rdda.map(lambda x:(x,1))
print(rddb.collection())
```
## 2)filter
选出所有func返回值为true的值的元素,生成一个新的分布式数据集返回
```
def my_map():
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:x*2)
rdd3 = rdd2.filter(lambda x:x>5)
print(rdd3.collect())
<==>
# 链式编程
print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
```
## 3)flatMap
输入的item能够被map到0or多个items输出,返回值是一个Sequence
```
def my_flatMap():
data = ['hello spark','hello world','hello world']
rdd = sc.parallelize(data)
print(rdd.flatMap(lambda line:line.split(" ")).collection())
```
不是很懂这个区别
## 4)groupByKey
把相同的Key的数据分发到一起
```
def my_groupBy():
data = ['hello spark','hello world','hello world']
rdd = sc.parallelize(data)
rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
rdd2 = rdd1.groupByKey()
print(rdd2.collect())#直接输出会输出spark的参数,无法直接输出值
print(rdd2.map(lambda x:{x[0]:list(x[1])}).collect()) #应该使用这种方式实现数据输出
```
## 5)reduceByKey
把相同的Key的数据分发到一起,并进行相应的计算
```
def my_groupBy():
data = ['hello spark','hello world','hello world']
rdd = sc.parallelize(data)
rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
rdd2 = rdd1.reduceByKey(lambda a,b:a+b)
print(rdd2.collect())
```
【面试考点:基础版本word count】
## 6) sortByKey()
默认升序列
需求1 :把wc的结果按照从大到小排列
```
def my_groupBy():
data = ['hello spark','hello world','hello world']
rdd = sc.parallelize(data)
rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
rdd2 = rdd1.reduceByKey(lambda a,b:a+b)
rdd3 = rdd2.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))
print(rdd3.collect())
```
## 7) Union
```
def my_union():
a = sc.parallize([1,2,3])
b = sc.parallize([2,3,4])
a.union(b).collect()
```
## 8) distinct
```
def my_distinct():
a = sc.parallize([1,2,3])
b = sc.parallize([2,3,4])
a.union(b).distinct().collect()
```
## 9) join
inner join
outer join:left/right/full
```
def my_join():
a = sc.parallize([("A","a1"),("C","c1"),("D","d1")])
b = sc.parallize([("B","b1"),("C","c2"),("C","c3")])
a.join(b).collect() #这个表示内连接
#结果[("C",("c1","c2")),("C",("c1","c3"))]
a.leftOuterJoin(b).collect() #左外链接
#结果:[("A",("a1",None)),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]
a.RightOuterJoin(b).collect() #右外链接
#结果:[("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3"))]
a.fullOuterJoin(b).collection() #全连接
#结果:[("A",("a1",None)),("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]
```
# 3 Action算子
## 1) 常用算子
collect、count、take、reduce、saveAsTextFile、foreach
## 2) 示例
```
def my_action():
data = [1,2,3,4,5,6,7]
rdd = sc.parallize(data)
rdd.collect() #显示
rdd.count() #计数
rdd.take(3) #取前三个[1,2,3]
rdd.max()/rdd.min()/rdd.sum() #集合计算
rdd.reduce(lambda a,b:a+b)
rdd.foreach(lambda x:print(x))
rdd.saveAsTextFile() #文件存储
```
# 4 Spark RDD实战
## 1)词评统计案例,多角度迭代
1)input:1个文件,多个文件(文件夹),不同后缀名
2)开发步骤分析:
文本内容的每一行转成一个个的单词:flatMap
单词==>(单词,1):map
把所有单词相同的计数相加,得到最终的结果:reduceByKey
### 版本1 写到控制台
```
from pyshark import SparkConf,SparkContext
if __name__ = "__main__":
if len(sys.srgv)!=2:
print("Usage:wordcount",file = sys.stderr)
conf = SparkConf()
sc = SparkContext(conf = conf)
def printResult():
counts = sc.textFile(sys.argv[1])\
.flatMap(lambda line:line.split("\t"))\
.map(lambda x:(x,1))\
.reduceByKey(lambda a,b:a+b)
output = counts.collect()
for(word,count) in output:
print("%s:%i"%(word,count))
printResult()
sc.stop()
```
tips: 在脚本的参数中添加文本地址的参数 传入相关脚本参数
在命令行提交代码:
```
./spark -submit --master local[2] --name spark0402 /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt
```
支持文件匹配模式
```
1 file:///home/hadoop/data/hello.txt
2 file:///home/hadoop/data/
3 file:///home/hadoop/data/*.txt
```
tips:
复制文件
cp ../hello.txt 1
cp ../hello.txt 2
cp ../hello.txt 1.txt
### 版本2 保存到文件
```
if len(sys.srgv)!=3:
print("Usage:wordcount ",file = sys.stderr)
sys.exit(-1)
def saveFile():
sc.textFile(sys.argv[1]) \
.flatMap(lambda line: line.split("\t")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b).\
.saveAsTextFile(sys.argv[2])
```
tips: 查看分区的内容
```
more part-0000*
```
### 作业:降序排列
## 2)Top N 案例实战
1)input:1/n文件文件夹后缀名
2)求某个维度的topn
3)开发步骤分析
文本内容的每一行根据需求提出你所需要的字段
单词==>(单词,1):map
把所有单词相同的计数相加,得到最终的结果:reduceByKey
取最多出现次数的降序:sortByKey
```
counts = sc.textFile(sys.argv[1])\
.flatMap(lambda line:line.split("\t"))\
.map(lambda x:(x[5],1))\
.reduceByKey(lambda a,b:a+b)\
.map(lambda x:(x[1],x[0]))\
.sortByKey()\
.map(lambda x:(x[1],x[0])).take(5)
for(word,count) in output:
print("%s:%i"%(word,count))
```
思考:如果统计数据会发生数据倾斜
## 3)平均数案例实战
开发步骤分析:
1)取出年龄map
2)计算年龄总和reduce
3)计算记录综述count
4)求平均数
```
ageData = sc.textFile(sys.argv[1])\
.flatMap(lambda x:x.split("\t")[1])\
totalAge = ageData.map(age:int(age)).reduce(a,b:a+b)
counts = ageData.count()
avgAge = totalAge/counts
print(ageAge)
```
网友评论