美文网首页
Spark基础系列之四--RDD编程基础下

Spark基础系列之四--RDD编程基础下

作者: 微生活_小阿楠 | 来源:发表于2020-05-04 08:26 被阅读0次

传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门

前言

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

三、持久化

  • 可以通过持久化(缓存)机制避免这种重复计算的开销
  • 可以使用persist()方法对一个RDD标记为持久化
  • 之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正的计算后,才会把计算结果进行持久化
  • 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用

1)针对上面的示例,增加持久化语句以后的执行过程如下:


四、分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上

1)分区的作用

  • 增加并行度
  • 减少通信开销


2)RDD分区原则

  • RDD分区的一个原则是使得分区的个数尽量对于集群中的CPU核心(core)数目

3)设置分区的个数

  • 3.1)在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:
    sc.textFile(path,partitionNum)
val array =Array(1,2,3,4,5)
val rdd = sc.parallelize(array,2)     //设置两个分区

  • 3.2)使用reparition方法重新设置分区个数
    通过转换操作得到新RDD时,直接调用repartition方法即可。例如:
val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data.partitions.size    //显示data这个RDD的分区数量,结果为2

val rdd = data.repartition(1)
rdd.partitions.size    //显示data这个RDD的分区数量,结果为1

4)自定义分区方法


五、练习

给定文件如下:包含序号,性别和身高

1 F 170
2 M 178
3 M 174
4 F 165

统计
1)男生总数、女生总数
2)男生最高和最低的身高、
3)女生最高和最低的身高。

import org.apache.spark.{SparkConf, SparkContext}

//1.统计男生总数,女生总数 2.男生最高和最低的身高 3.女生最高和最低的身高
object Count{
  def main(args:Array[String]){

    val input = "file:///usr/local/spark/mycode/wordcount/Count.txt"
    val conf = new SparkConf().setAppName("Count").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(input)

    //1.统计男生总数,女生总数
    val all_boy_count = textFile.flatMap(line => line.split(" ")).filter(line => line.contains("M")).map(word => (word,1)).reduceByKey(_+_)
    all_boy_count.foreach{case(k,v) => println("男生的总数: " + v)}

    val all_girl_count = textFile.flatMap(line => line.split(" ")).filter(line => line.contains("F")).map(word => (word,1)).reduceByKey((x,y) => x+y)
    all_girl_count.foreach{case(k,v) => println("女生的总数: "+ v)}

    //2.男生最高和最低的身高
    val boy_highest = textFile.map(x => x.split(" ")).map(line => {
      val gender = line(1)
      val  height = line(2)
      (gender,height)
    }).filter(x => x._1=="M")
    println("男生最高的身高: " + boy_highest.map(x => x._2).max)
    println("男生最低的身高: " + boy_highest.map(x => x._2).min)

    //3.女生最高和最低的身高
    val girl_highest = textFile.map(x => x.split(" ")).map(line => {
      val gender = line(1)
      val  height = line(2)
      (gender,height)
    }).filter(x => x._1=="F")
    println("女生最高的身高: " + girl_highest.map(x => x._2).max)
    println("女生最低的身高: " + girl_highest.map(x => x._2).min)

  }
}




相关文章

网友评论

      本文标题:Spark基础系列之四--RDD编程基础下

      本文链接:https://www.haomeiwen.com/subject/ckbfghtx.html