美文网首页
Spark开发--Scala--编程--示例(十)

Spark开发--Scala--编程--示例(十)

作者: 无剑_君 | 来源:发表于2019-12-17 13:46 被阅读0次

参考网站:https://spark.apache.org/examples.html

一、 词频数统计

统计一个或者多个文件中单词出现的次数。
分析:
  对于词频数统计,用 Spark 提供的算子来实现,我们首先需要将文本文件中的每一行转化成一个个的单词, 其次是对每一个出现的单词进行记一次数,最后就是把所有相同单词的计数相加得到最终的结果。
  对于第一步我们自然的想到使用 flatMap 算子把一行文本 split 成多个单词,然后对于第二步我们需要使用 map 算子把单个的单词转化成一个有计数的 Key-Value 对,即 word -> (word,1). 对于最后一步统计相同单词的出现次数,我们需要使用 reduceByKey 算子把相同单词的计数相加得到最终结果。
数据:

hello
spark
hadoop
hadoop
sqoop
sqoop
spark
  1. 上传数据文件到hdfs
root@master:~# vi wordcount
root@master:~# hadoop fs -put wordcount /wordcount

函数说明:
  reduceByKey的作用对象是(key, value)形式的RDD,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。
  保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这样后续可以对value做进一步的操作
  以上面的数据集为例,在spark中比如是word:RDD[(String, Int)] 两个字段分别是word、单个单词在不同文件中出现的次数,现在我们需要统计每个单词出现的总次数。
我们可以这样写:

val word = rdd1.reduceByKey((x,y) => x+y)

对上述的写法简化一下:

val word= rdd1.reduceByKey(_+_)

读取文件:

sc.textFiles(path) 

能将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式,文件的每一行 相当于 List中以 “,”号 隔开的一个元素,因此可以在每个partition中用for i in data的形式遍历处理Array里的数据;
参数:
path: String 是一个URI,這个URI可以是HDFS、本地文件(全部的节点都可以),或者其他Hadoop支持的文件系统URI返回的是一个字符串类型的RDD,也就是是RDD的内部形式是

Iterator[(String)] minPartitions=  math.min(defaultParallelism, 2)

是指定数据的分区,如果不指定分区,当你的核数大于2的时候,不指定分区数那么就是 2,当你的数据大于128M时候,Spark是为每一个快(block)创建一个分片(Hadoop-2.X之后为128m一个block)。
从当前目录读取一个文件:

val path = "Current.txt"  // 当前文件
val rdd1 = sc.textFile(path,2)

从当前目录读取一个Current.txt的文件
从当前目录读取多个文件:

val path = "Current1.txt,Current2.txt,"  // 当前文件
val rdd1 = sc.textFile(path,2)

从当前读取两个文件,分别是Cuttent1.txt和Current2.txt
从本地系统读取一个文件:

val path = "file:///root/test/README.md"  // 本地文件
val rdd1 = sc.textFile(path,2)

从本地系统读取整个文件夹:


val path = "file:///root/test/"  // 本地文件
val rdd1 = sc.textFile(path,2)

从本地系统中读取licenses这个文件夹下的所有文件這里特别注意的是,比如這个文件夹下有35个文件,上面分区数设置是2,那么整个RDD的分区数是35*2,這是错误的,這个RDD的分区数不管你的partition数设置为多少时,只要license這个文件夹下的這个文件a.txt(比如有a.txt)没有超过128m,那么a.txt就只有一个partition。那么就是说只要这35个文件其中没有一个超过128m,那么分区数就是 35个。

二、求平均年龄

  我们将假设我们需要统计一个 1000 万人口的所有人的平均年龄,当然如果您想测试 Spark 对于大数据的处理能力,您可以把人口数放的更大,比如 1 亿人口,当然这个取决于测试所用集群的存储容量。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是 ID,第二列是年龄。
数据格式:

1 58
2 99
3 36
4 84
5 42
6 94
7 2
8 96
9 82
10 81

分析:
  要计算平均年龄,那么首先需要对源文件对应的 RDD 进行处理,也就是将它转化成一个只包含年龄信息的 RDD,其次是计算元素个数即为总人数,然后是把所有年龄数加起来,最后平均年龄=总年龄/人数。
第一步我们需要使用 map 算子把源文件对应的 RDD 映射成一个新的只包含年龄数据的 RDD,很显然需要对在 map 算子的传入函数中使用 split 方法,得到数组后只取第二个元素即为年龄信息;
第二步计算数据元素总数需要对于第一步映射的结果 RDD 使用 count 算子;
第三步则是使用 reduce 算子对只包含年龄信息的 RDD 的所有元素用加法求和;最后使用除法计算平均年龄即可。

三、求男女最高与最低身高

  本案例假设我们需要对某个省的人口 (1 亿) 性别还有身高进行统计,需要计算出男女人数,男性中的最高和最低身高,以及女性中的最高和最低身高。本案例中用到的源文件有以下格式, 三列分别是 ID,性别,身高 (cm)。
分析:
  我们要分别统计男女的信息,那么很自然的想到首先需要对于男女信息从源文件的对应的 RDD 中进行分离,这样会产生两个新的 RDD,分别包含男女信息;其次是分别对男女信息对应的 RDD 的数据进行进一步映射,使其只包含身高数据,这样我们又得到两个 RDD,分别对应男性身高和女性身高;最后需要对这两个 RDD 进行排序,进而得到最高和最低的男性或女性身高。
  对于第一步,也就是分离男女信息,我们需要使用 filter 算子,过滤条件就是包含”M” 的行是男性,包含”F”的行是女性;第二步我们需要使用 map 算子把男女各自的身高数据从 RDD 中分离出来;第三步我们需要使用 sortBy 算子对男女身高数据进行排序。

函数说明:
sortBy函数是在org.apache.spark.rdd.RDD类中实现的。
该函数最多可以传三个参数:
  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
  第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
 
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
 
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
 
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
 
scala> rdd.sortBy(x => x, false).collect
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
 
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
 
scala> result.partitions.size
res9: Int = 2

四、频率最高的 K 个科技关键词

  我们假设某搜索引擎公司要统计过去一年搜索频率最高的 K 个科技关键词或词组,为了简化问题,我们假设关键词组已经被整理到一个或者多个文本文件中,并且文档具有以下格式。

Spark
Hadoop
HDFS
Open Source
IBM Big Insights
big data
cloud computing
IBM BlueMix
Coudant
Apache HBase
Java
C++
Maven CXF OSGI
HDFS
cloud computing
OpenStack
OpenStack
Spark
spark
SpringBoot
SpringCloud
DB2
No SQL Database
Storm
IBM BlueMix
IBM Integration Bus

我们可以看到一个关键词或者词组可能出现多次,并且大小写格式可能不一致。
分析:
  要解决这个问题,首先我们需要对每个关键词出现的次数进行计算,在这个过程中需要识别不同大小写的相同单词或者词组,如”Spark”和“spark” 需要被认定为一个单词。对于出现次数统计的过程和 word count 案例类似;其次我们需要对关键词或者词组按照出现的次数进行降序排序,在排序前需要把 RDD 数据元素从 (k,v) 转化成 (v,k);最后取排在最前面的 K 个单词或者词组。

第一步,我们需要使用 map 算子对源数据对应的 RDD 数据进行全小写转化并且给词组记一次数,然后调用 reduceByKey 算子计算相同词组的出现次数;
第二步我们需要对第一步产生的 RDD 的数据元素用 sortByKey 算子进行降序排序;
第三步再对排好序的 RDD 数据使用 take 算子获取前 K 个数据元素。

五、每种图书每天的平均销量

给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6)键值对的key表示图书名称,value表示每天图书销量,请计算出每个键对应的平均值,也就是每种图书每天的平均销量。

六、数据去重

原始数据:
file1:

2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c

file2:

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

数据输出:

2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d

数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。

相关文章

网友评论

      本文标题:Spark开发--Scala--编程--示例(十)

      本文链接:https://www.haomeiwen.com/subject/lishgctx.html