Spark

作者: 阳光课代表 | 来源:发表于2019-12-03 15:39 被阅读0次

Scala

因为Scala是基于Java虚拟机的,所有Scala代码都需要编译为字节码,然后在JVM上运行,所以Scala和Java是可以无缝操作的,Scala可以调用任意Java代码。

Map

val map = Map(("a", 1), ("b", 2))

for ((key, value) <- map) {
    println("key:" + key + ", value:" + value)
}

Tuple

val t = ("a", 1, 2)
println(t._3)

函数式编程

将函数赋值给变量

    def main(a: Array[String]) = {
        val addFun = add _
        println(addFun(1, 2))
    }

    def add(a: Int, b: Int): Int = {
        a + b
    }

匿名函数

        val addFun = (a:Int, b: Int) => {a + b}
        println(addFun(1, 2))

高阶函数

接收其它函数作为函数的参数

Spark

Spark基本工作原理

  • 分布式
    客户端(在本地编写Spark程序的机器)将Spark程序提交到Spark集群,Spark集群一般从HDFS上读取数据,然后分布式存放在集群的各个节点上。
  • 主要基于内存
  • 迭代式计算
    节点1、2并行计算,计算完成后可能将结果数据交给3、4节点再接着计算,最后处理的结果可以写入到MySQL中或直接返回客户端。
    Spark和Hadoop最大的不同在于迭代式计算,MapReduce分为两个阶段,map和reduce,两个阶段完了就结束了,所以一个job只能在map和reduce中做。而Spark的计算可以分为n个阶段,因为它是内存迭代式的,处理完一个阶段后可以继续往下处理很多阶段,而不仅仅是2个阶段,所以Spark可以提供更强大的功能。
    注:所有计算操作都是针对多个节点上的数据进行并行计算的。

RDD(弹性分布式数据集)

1、是一个数据集合,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD可以被并行操作;
2、RDD最重要的特性就是提供了容错性,可以自动从节点失败中恢复。即假如某个节点上的RDD partition因为节点故障导致数据丢失,那么RDD会自动通过自己的数据来源重新计算该partition,这一切对使用者是透明的;
3、RDD的数据默认是放在内存中的,但在内存资源不足时,Spark会自动将RDD数据写入磁盘(弹性),这一切对于用户是透明的;

Spark的核心编程时什么?

1、定义RDD,即RDD是从哪里读取数据来创建RDD,比如hdfs、mysql、hive等;
2、定义对RDD的计算操作,这个叫做算子,比如map、reduce、flatMap;
3、循环往复的过程;

什么是Spark开发?

1、核心开发:离线批处理/延迟性的交互式数据处理;
2、SQL查询:底层都是RDD和计算操作;
3、实时计算:底层都是RDD和计算操作;

Scala开发WordCount程序

        val lines: RDD[String] = sc.textFile("E:/a.txt")
        
        val wordsRDD: RDD[String] = lines.flatMap(line => line.split(" "))

        val tupleWordsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))

        val countRDD: RDD[(String, Int)] = tupleWordsRDD.reduceByKey((x, y) => x + y)

        countRDD.foreach(count => {
            print(count._1)
            print(count._2)
            println()
        })

WordCount程序原理

创建RDD

  • 使用集合创建
    主要用于测试;
        /* 配置Spark */
        val conf: SparkConf = new SparkConf()
                .setAppName("WordCountMain")
                .setMaster("local") //local代表本地单线程模式运行

        val sc: SparkContext = new SparkContext(conf)

        val list: List[Int] = List(1, 2, 3)

        val rdd: RDD[Int] = sc.makeRDD(list)

        rdd.foreach(x => println(x))

注:makeRDD方法的第二个参数可以只能将RDD切分成多少个partition,Spark会为每一个partition运行一个task来进行处理。官方建议是,为集群中每隔CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。

  • 使用本地文件创建
    主要用于临时性地处理一些存储了大量数据的文件;

  • 使用hdfs创建
    应该是最常用的生产环境处理方式;

map算子

        val nums: List[Int] = List(1, 2, 3)

        val numsRDD: RDD[Int] = sc.makeRDD(nums)

        /* 使用map算子将集合中的每个元素都乘2 */
        val newNumsRDD: RDD[Int] = numsRDD.map(num => num * 2)

        newNumsRDD.foreach(num => println(num))
        val lines: List[String] = List("hello spark", "hello scala")

        val linesRDD: RDD[String] = sc.makeRDD(lines)

        val words: RDD[Array[String]] = linesRDD.map(line => line.split(" "))

        words.foreach(arr => {
            arr.foreach(word => println(word))
        })

filter算子

        val nums: List[Int] = List(1, 2, 3, 4)

        val numsRDD: RDD[Int] = sc.makeRDD(nums)

        val evenNumsRDD: RDD[Int] = numsRDD.filter(num => num % 2== 0)

        evenNumsRDD.foreach(num => println(num)) // 2 4
        val lines: List[String] = List("hello", "world", "hi")

        val linesRDD: RDD[String] = sc.makeRDD(lines)

        val hasHWords: RDD[String] = linesRDD.filter(word => word.contains("h"))

        hasHWords.foreach(word => println(word)) //hello hi

flatMap算子

        val lines: List[String] = List("hello spark", "hello scala")

        val linesRDD: RDD[String] = sc.makeRDD(lines)

        val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(" "))

        wordsRDD.foreach(word => println(word))

注意区别:flatMap是把数组里面的数据拆分到最小的粒度后返回

groupByKey算子

        val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))

        val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)

        val scoreGrouped: RDD[(String, Iterable[Int])] = scoresRDD.groupByKey()

        scoreGrouped.foreach(score => {
            print(score._1 + "=")
            score._2.foreach(score => print(score + " "))
            println()
        })
        /*
        cai=80 70 
        bing=90 
         */

reduceByKey算子

        val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))

        val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)

        val scoreReduecd: RDD[(String, Int)] = scoresRDD.reduceByKey((x, y) => x + y)
        scoreReduecd.foreach(score => {
            print(score._1 + "=")
            println(score._2)

        })
        /*
        cai=150
        bing=90
         */

注:reduceByKey的作用对像是(key, value)形式的rdd,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。
保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这样后续可以对value做进一步的操作,比如排序。
reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。

sortByKey算子

        val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))

        val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)

        val scoreSortedRDD: RDD[(String, Int)] = scoresRDD.sortByKey()

        scoreSortedRDD.foreach(score => {
            print(score._1)
            print(score._2)
            println()
        })
        /*
        bing90
        cai80
        cai70
         */

join算子

        val stuList: List[(Int, String)] = List((1, "cai"), (2, "zhong"), (3, "bing"))
        val scoreList: List[(Int, Int)] = List((1, 70), (2, 80), (3, 90))

        val stuRDD: RDD[(Int, String)] = sc.makeRDD(stuList)
        val scoreRDD: RDD[(Int, Int)] = sc.makeRDD(scoreList)

        val joinedRDD: RDD[(Int, (String, Int))] = stuRDD.join(scoreRDD)

        joinedRDD.foreach(stu => {
            println("学号:" + stu._1)
            println("姓名:" + stu._2._1)
            println("成绩:" + stu._2._2)
        })
        /*
        学号:1
        姓名:cai
        成绩:70
        学号:3
        姓名:bing
        成绩:90
        学号:2
        姓名:zhong
        成绩:80
         */

action操作案例

reduce

        val nums: Array[Int] = Array(1, 2, 3)

        val numsRDD: RDD[Int] = sc.makeRDD(nums)

        val sum: Int = numsRDD.reduce((x, y) => x + y)
        
        println(sum) //6

collect

count

take

saveAsTextFile

countByKey

foreach

RDD持久化

共享变量

相关文章

网友评论

      本文标题:Spark

      本文链接:https://www.haomeiwen.com/subject/ojuxgctx.html