Spark

作者: 阳光课代表 | 来源:发表于2019-12-03 15:39 被阅读0次

    Scala

    因为Scala是基于Java虚拟机的,所有Scala代码都需要编译为字节码,然后在JVM上运行,所以Scala和Java是可以无缝操作的,Scala可以调用任意Java代码。

    Map

    val map = Map(("a", 1), ("b", 2))
    
    for ((key, value) <- map) {
        println("key:" + key + ", value:" + value)
    }
    

    Tuple

    val t = ("a", 1, 2)
    println(t._3)
    

    函数式编程

    将函数赋值给变量

        def main(a: Array[String]) = {
            val addFun = add _
            println(addFun(1, 2))
        }
    
        def add(a: Int, b: Int): Int = {
            a + b
        }
    

    匿名函数

            val addFun = (a:Int, b: Int) => {a + b}
            println(addFun(1, 2))
    

    高阶函数

    接收其它函数作为函数的参数

    Spark

    Spark基本工作原理

    • 分布式
      客户端(在本地编写Spark程序的机器)将Spark程序提交到Spark集群,Spark集群一般从HDFS上读取数据,然后分布式存放在集群的各个节点上。
    • 主要基于内存
    • 迭代式计算
      节点1、2并行计算,计算完成后可能将结果数据交给3、4节点再接着计算,最后处理的结果可以写入到MySQL中或直接返回客户端。
      Spark和Hadoop最大的不同在于迭代式计算,MapReduce分为两个阶段,map和reduce,两个阶段完了就结束了,所以一个job只能在map和reduce中做。而Spark的计算可以分为n个阶段,因为它是内存迭代式的,处理完一个阶段后可以继续往下处理很多阶段,而不仅仅是2个阶段,所以Spark可以提供更强大的功能。
      注:所有计算操作都是针对多个节点上的数据进行并行计算的。

    RDD(弹性分布式数据集)

    1、是一个数据集合,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD可以被并行操作;
    2、RDD最重要的特性就是提供了容错性,可以自动从节点失败中恢复。即假如某个节点上的RDD partition因为节点故障导致数据丢失,那么RDD会自动通过自己的数据来源重新计算该partition,这一切对使用者是透明的;
    3、RDD的数据默认是放在内存中的,但在内存资源不足时,Spark会自动将RDD数据写入磁盘(弹性),这一切对于用户是透明的;

    Spark的核心编程时什么?

    1、定义RDD,即RDD是从哪里读取数据来创建RDD,比如hdfs、mysql、hive等;
    2、定义对RDD的计算操作,这个叫做算子,比如map、reduce、flatMap;
    3、循环往复的过程;

    什么是Spark开发?

    1、核心开发:离线批处理/延迟性的交互式数据处理;
    2、SQL查询:底层都是RDD和计算操作;
    3、实时计算:底层都是RDD和计算操作;

    Scala开发WordCount程序

            val lines: RDD[String] = sc.textFile("E:/a.txt")
            
            val wordsRDD: RDD[String] = lines.flatMap(line => line.split(" "))
    
            val tupleWordsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
    
            val countRDD: RDD[(String, Int)] = tupleWordsRDD.reduceByKey((x, y) => x + y)
    
            countRDD.foreach(count => {
                print(count._1)
                print(count._2)
                println()
            })
    

    WordCount程序原理

    创建RDD

    • 使用集合创建
      主要用于测试;
            /* 配置Spark */
            val conf: SparkConf = new SparkConf()
                    .setAppName("WordCountMain")
                    .setMaster("local") //local代表本地单线程模式运行
    
            val sc: SparkContext = new SparkContext(conf)
    
            val list: List[Int] = List(1, 2, 3)
    
            val rdd: RDD[Int] = sc.makeRDD(list)
    
            rdd.foreach(x => println(x))
    

    注:makeRDD方法的第二个参数可以只能将RDD切分成多少个partition,Spark会为每一个partition运行一个task来进行处理。官方建议是,为集群中每隔CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。

    • 使用本地文件创建
      主要用于临时性地处理一些存储了大量数据的文件;

    • 使用hdfs创建
      应该是最常用的生产环境处理方式;

    map算子

            val nums: List[Int] = List(1, 2, 3)
    
            val numsRDD: RDD[Int] = sc.makeRDD(nums)
    
            /* 使用map算子将集合中的每个元素都乘2 */
            val newNumsRDD: RDD[Int] = numsRDD.map(num => num * 2)
    
            newNumsRDD.foreach(num => println(num))
    
            val lines: List[String] = List("hello spark", "hello scala")
    
            val linesRDD: RDD[String] = sc.makeRDD(lines)
    
            val words: RDD[Array[String]] = linesRDD.map(line => line.split(" "))
    
            words.foreach(arr => {
                arr.foreach(word => println(word))
            })
    

    filter算子

            val nums: List[Int] = List(1, 2, 3, 4)
    
            val numsRDD: RDD[Int] = sc.makeRDD(nums)
    
            val evenNumsRDD: RDD[Int] = numsRDD.filter(num => num % 2== 0)
    
            evenNumsRDD.foreach(num => println(num)) // 2 4
    
            val lines: List[String] = List("hello", "world", "hi")
    
            val linesRDD: RDD[String] = sc.makeRDD(lines)
    
            val hasHWords: RDD[String] = linesRDD.filter(word => word.contains("h"))
    
            hasHWords.foreach(word => println(word)) //hello hi
    

    flatMap算子

            val lines: List[String] = List("hello spark", "hello scala")
    
            val linesRDD: RDD[String] = sc.makeRDD(lines)
    
            val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(" "))
    
            wordsRDD.foreach(word => println(word))
    

    注意区别:flatMap是把数组里面的数据拆分到最小的粒度后返回

    groupByKey算子

            val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
    
            val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
    
            val scoreGrouped: RDD[(String, Iterable[Int])] = scoresRDD.groupByKey()
    
            scoreGrouped.foreach(score => {
                print(score._1 + "=")
                score._2.foreach(score => print(score + " "))
                println()
            })
            /*
            cai=80 70 
            bing=90 
             */
    

    reduceByKey算子

            val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
    
            val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
    
            val scoreReduecd: RDD[(String, Int)] = scoresRDD.reduceByKey((x, y) => x + y)
            scoreReduecd.foreach(score => {
                print(score._1 + "=")
                println(score._2)
    
            })
            /*
            cai=150
            bing=90
             */
    

    注:reduceByKey的作用对像是(key, value)形式的rdd,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。
    保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这样后续可以对value做进一步的操作,比如排序。
    reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。

    sortByKey算子

            val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
    
            val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
    
            val scoreSortedRDD: RDD[(String, Int)] = scoresRDD.sortByKey()
    
            scoreSortedRDD.foreach(score => {
                print(score._1)
                print(score._2)
                println()
            })
            /*
            bing90
            cai80
            cai70
             */
    

    join算子

            val stuList: List[(Int, String)] = List((1, "cai"), (2, "zhong"), (3, "bing"))
            val scoreList: List[(Int, Int)] = List((1, 70), (2, 80), (3, 90))
    
            val stuRDD: RDD[(Int, String)] = sc.makeRDD(stuList)
            val scoreRDD: RDD[(Int, Int)] = sc.makeRDD(scoreList)
    
            val joinedRDD: RDD[(Int, (String, Int))] = stuRDD.join(scoreRDD)
    
            joinedRDD.foreach(stu => {
                println("学号:" + stu._1)
                println("姓名:" + stu._2._1)
                println("成绩:" + stu._2._2)
            })
            /*
            学号:1
            姓名:cai
            成绩:70
            学号:3
            姓名:bing
            成绩:90
            学号:2
            姓名:zhong
            成绩:80
             */
    

    action操作案例

    reduce

            val nums: Array[Int] = Array(1, 2, 3)
    
            val numsRDD: RDD[Int] = sc.makeRDD(nums)
    
            val sum: Int = numsRDD.reduce((x, y) => x + y)
            
            println(sum) //6
    

    collect

    count

    take

    saveAsTextFile

    countByKey

    foreach

    RDD持久化

    共享变量

    相关文章

      网友评论

          本文标题:Spark

          本文链接:https://www.haomeiwen.com/subject/ojuxgctx.html