Spark Chapter 4 RDD 常用算子

作者: 深海suke | 来源:发表于2019-08-12 21:47 被阅读0次

# 1 RDD 常用操作

*transformations*, which create a new dataset from an existing one

RDDA ---transformations-->RDDB

RDDB = RDDA.map(...)

【面试重点】lazy——transformations 不会立刻计算内容,只会记录关系,只有遇到action才会进行计算。

操作包含:map/filter/gtoup by/distinct

*actions*, which return a value to the driver program after running a computation on the dataset.

操作包含:count,reduce、collect/^

【特点】

* transformation are lazy,nothing actually happens until an action is called;

* action triggers the computation;

* action returns values to driver or writes data to external storage;

# 2 Transformation算子

## 1map

map(func):将func函数作用到每个数据集上面,生成一个新的分布式数据集返回

```

def my_map2():

    data = ['cat','dog','tiger','lion']

    rdda = sc.parallelize(data)

    rddb = rdda.map(lambda x:(x,1))

    print(rddb.collection())

```

## 2filter

选出所有func返回值为true的值的元素,生成一个新的分布式数据集返回

```

def my_map():

    data = [1,2,3,4,5]

    rdd1 = sc.parallelize(data)

    rdd2 = rdd1.map(lambda x:x*2)

    rdd3 = rdd2.filter(lambda x:x>5)

    print(rdd3.collect())

<==>

# 链式编程

 print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())

```

## 3flatMap

输入的item能够被map到0or多个items输出,返回值是一个Sequence

```

def my_flatMap():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  print(rdd.flatMap(lambda line:line.split(" ")).collection())

```

 不是很懂这个区别

## 4groupByKey

把相同的Key的数据分发到一起

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.groupByKey()

  print(rdd2.collect())#直接输出会输出spark的参数,无法直接输出值

  print(rdd2.map(lambda x:{x[0]:list(x[1])}).collect()) #应该使用这种方式实现数据输出

```

## 5reduceByKey

把相同的Key的数据分发到一起,并进行相应的计算

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

  print(rdd2.collect())

```

【面试考点:基础版本word count】

## 6) sortByKey()

默认升序列

需求1 :把wc的结果按照从大到小排列

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

  rdd3 = rdd2.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))

  print(rdd3.collect())

```

## 7) Union

```

def my_union():

  a = sc.parallize([1,2,3])

  b = sc.parallize([2,3,4])

  a.union(b).collect()

```

## 8) distinct

```

def my_distinct():

  a = sc.parallize([1,2,3])

  b = sc.parallize([2,3,4])

  a.union(b).distinct().collect()

```

## 9) join

inner join

outer join:left/right/full

```

def my_join():

  a = sc.parallize([("A","a1"),("C","c1"),("D","d1")])

  b = sc.parallize([("B","b1"),("C","c2"),("C","c3")])

  a.join(b).collect() #这个表示内连接

  #结果[("C",("c1","c2")),("C",("c1","c3"))]

  a.leftOuterJoin(b).collect() #左外链接

  #结果:[("A",("a1",None)),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]

  a.RightOuterJoin(b).collect() #右外链接

  #结果:[("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3"))]

  a.fullOuterJoin(b).collection() #全连接

  #结果:[("A",("a1",None)),("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]

```

# 3 Action算子

## 1) 常用算子

collect、count、take、reduce、saveAsTextFile、foreach

## 2) 示例

```

def my_action():

  data = [1,2,3,4,5,6,7]

  rdd = sc.parallize(data)

  rdd.collect() #显示

  rdd.count() #计数

  rdd.take(3) #取前三个[1,2,3]

  rdd.max()/rdd.min()/rdd.sum() #集合计算

  rdd.reduce(lambda a,b:a+b)

  rdd.foreach(lambda x:print(x))

  rdd.saveAsTextFile() #文件存储

```

# 4 Spark RDD实战

## 1)词评统计案例,多角度迭代

1)input:1个文件,多个文件(文件夹),不同后缀名

2)开发步骤分析:

文本内容的每一行转成一个个的单词:flatMap

单词==>(单词,1):map

把所有单词相同的计数相加,得到最终的结果:reduceByKey

### 版本1 写到控制台

```

from pyshark import SparkConf,SparkContext

if __name__ = "__main__":

    if len(sys.srgv)!=2:

        print("Usage:wordcount",file = sys.stderr)

    conf = SparkConf()

    sc = SparkContext(conf = conf)

    def printResult():

        counts = sc.textFile(sys.argv[1])\

            .flatMap(lambda line:line.split("\t"))\

            .map(lambda x:(x,1))\

            .reduceByKey(lambda a,b:a+b)

        output = counts.collect()

        for(word,count) in output:

            print("%s:%i"%(word,count))

    printResult()

    sc.stop()

```

tips: 在脚本的参数中添加文本地址的参数 传入相关脚本参数

在命令行提交代码:

```

./spark -submit --master local[2] --name spark0402 /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt

```

支持文件匹配模式

```

1 file:///home/hadoop/data/hello.txt

2 file:///home/hadoop/data/

3 file:///home/hadoop/data/*.txt

```

tips:

复制文件

cp ../hello.txt 1

cp ../hello.txt 2

cp ../hello.txt 1.txt

### 版本2 保存到文件

```

if len(sys.srgv)!=3:

    print("Usage:wordcount ",file = sys.stderr)

    sys.exit(-1)

def saveFile():

    sc.textFile(sys.argv[1]) \

        .flatMap(lambda line: line.split("\t")) \

        .map(lambda x: (x, 1)) \

        .reduceByKey(lambda a, b: a + b).\

        .saveAsTextFile(sys.argv[2])

```

tips: 查看分区的内容

```

more part-0000*

```

### 作业:降序排列

## 2Top N 案例实战

1)input:1/n文件文件夹后缀名

2)求某个维度的topn

3)开发步骤分析

文本内容的每一行根据需求提出你所需要的字段

单词==>(单词,1):map

把所有单词相同的计数相加,得到最终的结果:reduceByKey

取最多出现次数的降序:sortByKey

```

counts = sc.textFile(sys.argv[1])\

        .flatMap(lambda line:line.split("\t"))\

         .map(lambda x:(x[5],1))\

         .reduceByKey(lambda a,b:a+b)\

         .map(lambda x:(x[1],x[0]))\

         .sortByKey()\

         .map(lambda x:(x[1],x[0])).take(5)

for(word,count) in output:

  print("%s:%i"%(word,count))         

```

思考:如果统计数据会发生数据倾斜

## 3)平均数案例实战

开发步骤分析:

1)取出年龄map

2)计算年龄总和reduce

3)计算记录综述count

4)求平均数

```

ageData = sc.textFile(sys.argv[1])\

        .flatMap(lambda x:x.split("\t")[1])\

totalAge = ageData.map(age:int(age)).reduce(a,b:a+b)

counts = ageData.count()       

avgAge = totalAge/counts

print(ageAge) 

```

相关文章

网友评论

    本文标题:Spark Chapter 4 RDD 常用算子

    本文链接:https://www.haomeiwen.com/subject/kgwpjctx.html