(十五)大数据学习之Spark

作者: Movle | 来源:发表于2019-11-23 21:39 被阅读0次

    Spark

    一.Spark生态圈:

    (1)Spark Core : RDD(弹性分布式数据集)
    (2)Spark SQL
    (3)Spark Streaming
    (4)Spark MLLib:协同过滤,ALS,逻辑回归等等 --> 机器学习
    (5)Spark Graphx : 图计算

    二.什么是Spark

    1.Spark是什么:
    Spark是一个针对大规模数据处理的快速通用引擎。

    • Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。

    2.特点:

    • 快:快100倍(Hadoop 3 之前)
    • 易用:支持多种语言开发
    • 通用性:生态系统全。
    • 易用性:兼容Hadoop

    3.最大特点:基于内存

    • Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

    三.Spark体系架构

    1.Spark集群的体系架构图解:

    image.png

    2.Spark的主从结构

    image.png

    四.Spark的安装部署

    1.Spark的安装部署方式有以下几种模式:
    (1)Standalone: 本机调试

    (2)YARN
    (3)Mesos
    (4)Amazon EC2
    2.执行过程:
    一个Worker有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。————> RDD

    五.Spark的搭建:

    1.准备工作

    • jdk
    • 配置主机名
    • 免密码登录

    2.伪分布式模式安装:
    (1)下载
    (2)上传到linux
    (3)解压
    (4)修改配置文件

    • 配置文件:conf/spark-env.sh
    cd /opt/module
    
    mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夹
    
    cd /opt/module/spark/conf 
    
    mv spark-env.sh.template spark-env.sh    //重命名配置文件
    
    vi spark-env.sh
    

    修改内容如下:

    export JAVA_HOME=/opt/module/jdk1.8.0_144
    export SPARK_MASTER_HOST=bigdata121      //主节点的服务器名
    export SPARK_MASTER_PORT=7077           //主节点端口号
    //下面的可以不写,默认
    export SPARK_WORKER_CORES=1
    export SPARK_WORKER_MEMORY=1024m
    
    image.png
    • 配置文件:conf/slave
    mv slaves.template slaves
    
    vi slaves
    

    新增内容:

    bigdata121
    
    image.png

    (5)启动:

    cd /opt/module/spark
    
    sbin/start-all.sh
    
    image.png

    (6)验证:192.168.127.121:8080

    image.png

    3.全分布的安装部署:
    (1)下载
    (2)上传到linux
    (3)解压
    (4)修改配置文件

    • 配置文件:conf/spark-env.sh
    cd /opt/module
    
    mv spark-2.1.0-bin-hadoop2.7/ spark/       //重命名spark文件夹
    
    cd /opt/module/spark/conf 
    
    mv spark-env.sh.template spark-env.sh    //重命名配置文件
    
    vi spark-env.sh
    

    修改内容如下:

    export JAVA_HOME=/opt/module/jdk1.8.0_144
    export SPARK_MASTER_HOST=bigdata121      //主节点的服务器名
    export SPARK_MASTER_PORT=7077           //主节点端口号
    //下面的可以不写,默认
    export SPARK_WORKER_CORES=1
    export SPARK_WORKER_MEMORY=1024m
    
    image.png
    • 配置文件:conf/slave
    mv slaves.template slaves
    
    vi slaves
    

    新增内容:

    bigdata122
    bigdata123
    
    image.png

    (5)拷贝到其他两台服务器

    cd /opt/module
    
    
    src -r spark/ bigdata122:/opt/module
    
    src -r spark/ bigdata123:/opt/module
      
    

    (6)启动Spark集群:

    cd /opt/module/spark
    
    sbin/start-all.sh
    

    六.Spark的HA

    1.回顾HA:
    (1)HDFS,Yarn,Hbase,Spark:都是主从结构
    (2)单点故障
    2.基于文件系统的单点恢复
    (1)主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息。
    (2)基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置

    配置参数 参考值
    spark.deploy.recoveryMode 设置为FILESYSTEM开启单点恢复功能,默认值:NONE
    spark.deploy.recoveryDirectory Spark 保存恢复状态的目录

    3.基于Zookeeper的Standby Masters
    (1)ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。

    image.png
    配置参数 参考值
    spark.deploy.recoveryMode 设置为ZOOKEEPER开启单点恢复功能,默认值:NONE
    spark.deploy.zookeeper.url ZooKeeper集群的地址
    spark.deploy.zookeeper.dir Spark信息在ZK中的保存目录,默认:/spark

    (2)修改spark-env.sh参考:

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
    -Dspark.deploy.zookeeper.url=bigdata12:2181,bigdata13:2181,bigdata14:2181 
    -Dspark.deploy.zookeeper.dir=/spark"
    

    (3)另外:每个节点上,需要将以下两行注释掉。

    image.png

    (4)同步到其他两台服务器
    (5)ZooKeeper中保存的信息

    image.png image.png

    七.执行Spark Demo程序

    1.使用Spark Shell
    (1)spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。相当于REPL ,作为一个独立的Application运行
    (2)两种模式:

    • 本地模式:spark-shell 不接任何参数,代表本地模式
    • 集群模式:spark-shell后面带有参数

    (3)启动Spark shell:

    spark-shell
    

    参数说明:

    --master spark://spark81:7077     //指定Master的地址
    --executor-memory 2g      //指定每个worker可用内存为2G
    --total-executor-cores 2       //指定整个集群使用的cup核数为2个
    

    例如:

    spark-shell --master spark://spark81:7077 --executor-memory 2g --total-executor-cores 2
    

    (4)在Spark shell中编写WordCount程序

    sc.textFile("hdfs://192.168.88.111:9000/data/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.88.111:9000/output/spark/wc")
    

    参数说明:

    (5)wordcount程序,处理本地文件,把结果打印到屏幕上

    scala> sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                    .flatMap(_.split(" "))
                    .map((_,1))
                    .reduceByKey(_+_)
                    .collect
                    
                    res0: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
    

    (6)wordcount程序,处理HDFS文件,结果保存在hdfs上

    sc.textFile("hdfs://node1:8020/tmp_files/test_WordCount.txt")
                    .flatMap(_.split(" "))
                    .map((_,1))
                    .reduceByKey(_+_)
                    .saveAsTextFile("hdfs://node1:8020/output/0331/test_WordCount")
    

    (7)单步运行wordcount --->RDD

    scala> val rdd1 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
                    rdd1: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
    
    scala> 1+1
    res2: Int = 2
    
    scala> rdd1.collect
    res3: Array[String] = Array(I love Beijing, I love China, Beijing is the capital of China)
    
    scala> val rdd2 = rdd1.flatMap(_.split(" "))
                    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at flatMap at <console>:26
    
    scala> rdd2.collect
    res4: Array[String] = Array(I, love, Beijing, I, love, China, Beijing, is, the, capital, of, China)
    
    scala> val rdd3 = rdd2.map((_,1))
                    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:28
    
    scala> rdd3.collect
    res5: Array[(String, Int)] = Array((I,1), (love,1), (Beijing,1), (I,1), (love,1), (China,1), (Beijing,1), (is,1), (the,1), (capital,1), (of,1), (China,1))
    
    scala> val rdd4 = rdd3.reduceByKey(_+_)
                    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:30
    
    scala> rdd4.collect
    res6: Array[(String, Int)] = Array((is,1), (love,2), (capital,1), (Beijing,2), (China,2), (I,2), (of,1), (the,1))
    

    (8)RDD 弹性分布式数据集
    (9)Scala复习:

    • flatten:把嵌套的结果展开
    scala>List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
    res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
    
    • flatmap:相当于一个 map + flatten
    scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
                        myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))
    scala> myList.flatMap(x=>x.map(_*2))
    res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)                 
    myList.flatMap(x=>x.map(_*2))
    

    flatmao执行过程:

    • 将 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 调用 map(_*2) 方法。x 代表一个List
    • flatten
      2.在IDEA中编写WordCount程序
      (1)需要的jar包:$SPARK_HOME/jars/*.jar
      (2)创建Scala Project,并创建Scala Object、或者Java Class
      (3)书写源代码,并打成jar包,上传到Linux

    Scala版本

    image.png

    (4)运行程序:

    spark-submit --master spark://spark81:7077 
    --class mydemo.WordCount jars/wc.jar 
    hdfs://192.168.88.111:9000/data/data.txt 
    hdfs://192.168.88.111:9000/output/spark/wc1
    

    Java版本(直接输出在屏幕)

    image.png

    (4)运行程序:

    spark-submit --master spark://spark81:7077 
    --class mydemo.JavaWordCount jars/wc.jar 
    hdfs://192.168.88.111:9000/data/data.txt
    

    八.Spark运行机制及原理分析

    1.WordCount执行的流程分析

    image.png

    2.Spark提交任务的流程析

    image.png

    九.RDD和RDD特性,RDD的算子

    1.RDD:弹性分布式数据集
    (1)什么是RDD?

    • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

    (2)RDD的属性:

    • 一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
    • 一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
    • RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
    • 一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
    • 一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    2.如何创建RDD
    (1)通过SparkContext.parallelize方法来创建(通过sc.parallelize进行创建)

    scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
    rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
                
    scala> rdd1.partitions.length
    res35: Int = 3
    scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
    rdd1:org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
    
    scala> rdd1.partitions.length
    res36: Int = 2
    

    (2)通过外部数据源来创建

    sc.textFile()           
    scala> val rdd2 = sc.textFile("/usr/local/tmp_files/test_WordCount.txt")
    rdd2:org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29
    

    (3)RDD的类型:TransformationAction
    3.RDD的基本原理:

    image.png

    4.Transformation
    (1)RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

    转换 含义
    map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
    sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])
    sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
    cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
    cartesian(otherDataset) 笛卡尔积
    pipe(command, [envVars])
    coalesce(numPartitions)
    repartition(numPartitions)
    repartitionAndSortWithinPartitions(partitioner)

    5.Action

    动作 含义
    reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering])
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path)
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

    十.RDD特性

    1.RDD的缓存机制:
    (1)作用:提高性能
    (2)使用:标识RDD可以被缓存 persist cache
    (3)可以缓存的位置:

    2.RDD的容错机制:通过检查点来实现
    (1)
    (1)复习检查点:HDFS中的检查点:有SecondaryNamenode来实现日志的合并。
    (2)RDD的检查点:容错

    • 概念:血统 Lineage
    • 理解:表示任务执行的生命周期。
    • WordCount textFile ---> redceByKey
    • 如果血统越长,越容易出错。
    • 假如有检查点,可以从最近的一个检查点开始,往后面计算。不用重头计算。

    (3)RDD检查点的类型:
    a.基于本地目录:需要将Spark shell 或者任务运行在本地模式上(setMaster("local"))

    image.png

    b.HDFS目录:用于生产,集群模式

    image.png
    sc.setCheckPointDir(目录)
                    
    //举例:设置检查点
    scala> var rdd1 = sc.textFile("hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt")
    rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.109.131:8020/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24
    
    //设置检查点目录:
    scala> sc.setCheckpointDir("hdfs://192.168.109.131:8020/sparkckpt")
    
    //标识rdd1可以执行检查点操作
    scala> rdd1.checkpoint
    
    scala> rdd1.count
    res2: Long = 923452 
    

    3.依赖关系:宽依赖,窄依赖
    (1)RDD的依赖关系:

    • RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
      (2)窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
      总结:窄依赖我们形象的比喻为独生子女

    (3)宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
    总结:宽依赖我们形象的比喻为超生

    4.Spark任务中的Stage

    • DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
    image.png

    十一.RDD的高级算子

    1.mapPartitionsWithIndex:对RDD中的每个分区(带有下标)进行操作,下标用index表示
    通过这个算子,我们可以获取分区号。

    def mapPartitionsWithIndex[U](
            f: (Int, Iterator[T]) ⇒ Iterator[U], 
            preservesPartitioning: Boolean = false)(
            implicit arg0: ClassTag[U]): RDD[U]
            
    //参数:f是个函数参数 f 中第一个参数是Int,代表分区号,第二个Iterator[T]代表分区中的元素
    

    例如:

    scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
    
    scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
                 | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
                 | }
    fun1: (index: Int, iter: Iterator[Int])Iterator[String]
    
    scala> rdd1.mapPartitions
            mapPartitions   mapPartitionsWithIndex
    
    scala> rdd1.mapPartitionsWithIndex(fun1).collect
    res3: Array[String] = Array(
            [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ], [partId : 2 , value = 6 ], [partId : 2 , value = 7 ], [partId : 2 , value = 8 ])
    

    2.aggregate:聚合操作。类似于分组。
    (1)先对局部进行聚合操作,再对全局进行聚合操作。

    //调用聚合操作
    scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
    
    scala> rdd2.mapPartitionsWithIndex(fun1).collect
    res4: Array[String] = Array(
            [partId : 0 , value = 1 ], [partId : 0 , value = 2 ], [partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
    
    scala> import scala.math._
    import scala.math._
    
    scala> rdd2.aggregate(0)(max(_,_),_+_)
    res6: Int = 7
    

    说明:
    (2)对字符串操作

    scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
    
    scala> rdd2.aggregate("")(_+_,_+_)
    res11: String = abcdef
    
    scala> rdd2.aggregate("*")(_+_,_+_)
    res12: String = **def*abc
    

    (3)复杂的例子:
    a.

    scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
            
    scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
                 | iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
                 | }
    
    scala> rdd3.mapPartitionsWithIndex(fun1).collect
    res17: Array[String] = Array(
            [partId : 0 , value = 12 ], [partId : 0 , value = 23 ], [partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
            
    scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
    res13: String = 42
    

    执行过程:

    • 第一个分区:
      (a)第一次比较: "" "12" 长度最大值 2 2-->"2"
      (b)第二次比较: “2” “23” 长度最大值 2 2-->"2"

    • 第二个分区:
      (a)第一次比较: "" "345" 长度最大值 3 3-->"3"
      (b)第二次比较: “3” “4567” 长度最大值 4 4-->"4"

    b.

    rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
            
    scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
    res18: String = 11
    

    执行过程:

    • 第一个分区:
      第一次比较: "" "12" 长度最小值 0 0-->"0"
      第二次比较: “0” “23” 长度最小值 1 1-->"1"

    • 第二个分区:
      第一次比较: "" "345" 长度最小值 0 0-->"0"
      第二次比较: “0” “4567” 长度最小值 1 1-->"1"
      c.aggregateByKey:类似于aggregate,区别:操作的是 key value 的数据类型。

    3.其他高级算子:

    十二.编程案例

    1.分析日志
    (1)需求:找到访问量最高的两个网页

    • 第一步:对网页的访问量求和
    • 第二步:排序,降序

    (2)创建自定义分区

    (3)使用JDBCRDD 操作数据库

    (4)操作数据库:把结果存放到数据库中

    Spark SQL

    Spark sql基础

    1.什么是Spark SQL

    • Spark SQL 是spark的一个模块。来处理 结构化 的数据,不能处理非结构化的数据
      2.特点:
      (1)容易集成:
    • 不需要单独安装。

    (2)统一的数据访问方式

    • 结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
    • 对接多种数据源,且使用方式类似
      (3)完全兼容hive
    • 把Hive中的数据,读取到Spark SQL中运行。

    (4) 支持标准的数据连接

    • JDBC

    3.为什么学习Spark SQL

    • 执行效率比Hive高
    • hive 2.x 执行引擎可以使用 Spark
      4.核心概念:表(DataFrame DataSet)
    • mysql中的表:表结构、数据
    • DataFrame:Schema、RDD(数据)
    • DataSet 在spark1.6以后,对DataFrame做了一个封装。
      5.创建DataFrame
      测试数据:员工表、部门表
      (1)第一种方式:使用case class
      a.定义Schema
      b.读取文件
      c.把每行数据,映射到Emp上
      d.生成DataFrame

    (2)第二种方式 使用Spark Session
    (3)直接读取一个带格式的文件。

    6.操作DataFrame
    (1)DSL语句
    (2)SQL语句
    注意:不能直接执行SQL,需要生成一个视图,再执行sql。
    (3)多表查询

    7.操作DataSet
    (1)跟DataFrame类似,是一套新的接口。高级的Dataframe
    (2)创建DataSet

    • 使用序列来创建DataSet。
    • 使用JSON数据来创建DataSet
    • 使用其他数据

    (3)DataSet案例

    (4)多表查询

    • 创建部门表
    • 创建员工表
    • 执行多表查询:等值连接
    • 多表连接后再筛选

    7.Spark SQL中的视图
    (1)视图是一个虚表,不存储数据。
    (2)两种类型:

    • 普通视图(本地视图):只在当前Session中有效。createOrReplaceTempView createTempView
    • 全局视图: createGlobalTempView:在不同的Session中都有用 把全局视图创建在命名空间中:global_temp中。类似于一个库。

    二.使用数据源

    1.在Spark SQL中,可以使用各种各样的数据源来操作。 结构化
    2.使用load函数、save函数

    • load函数是加载数据,save是存储数据
      注意:使用load 或 save时,默认是Parquet文件。列式存储文件。
      3.Parquet文件:列式存储文件,是Spark SQL 默认的数据源
    • 就是一个普通的文件

    (1)把其他文件,转换成Parquet文件
    (2)支持Schema的合并

    4.json文件

    5.JDBC
    (1)使用JDBC操作关系型数据库,加载到Spark中进行分析和处理。
    (2)方式一:
    (3)方式二:

    6.使用hive
    (1)spark SQL 完全兼容hive
    (2)需要进行配置

    • 拷贝一下文件到spark/conf目录下:
    • Hive 配置文件: hive-site.xml
    • Hadoop 配置文件:core-site.xml hdfs-site.xml

    (3)配置好后,重启spark

    (4)启动Hadoop 与 hive

    三.在IDE中开发Spark SQL

    四.性能优化

    1.用内存中缓存表的数据
    直接读取内存的值,来提高性能
    2.了解性能优化的相关参数:参考讲义

    Spark Streaming

    一.常用的实时计算引擎(流式计算)

    1.Apache Storm:真正的流式计算

    2.Spark Streaming :严格上来说,不是真正的流式计算(实时计算)
    把连续的流式数据,当成不连续的RDD
    本质:是一个离散计算(不连续)
    3.Apache Flink:真正的流式计算。与Spark Streaming相反。
    把离散的数据,当成流式数据来处理
    4.JStorm

    二.Spark Streaming基础

    1.什么是 Spark Streaming。

    • Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
      易于构建灵活的、高容错的流式系统。

    2.特点:

    • 易用,已经集成到Spark中
    • 容错性:底层RDD,RDD本身具有容错机制
    • 支持多种语言:Java Scala Python

    3.演示官方的Demo
    往Spark Streaming中发送字符串,Spark 接收到以后,进行计数
    使用消息服务器 netcat Linux自带
    yum install nc.x86_64
    nc -l 1234
    注意:总核心数 大于等于2。一个核心用于接收数据,另一个用于处理数据
    在netcat中写入数据 Spark Streaming可以取到

    4.开发自己的NetWorkWordCount程序,和Spark Core类似
    问题:Hello Hello
    Hello World
    现在现象:(Hello,2)
    (Hello , 1) (World , 1)
    能不能累加起来?保存记录下以前的状态?
    通过Spark Streaming提供的算子来实现

    三.高级特性:

    1.什么是DStream?离散流

    • 把连续的数据变成不连续的RDD
    • 因为DStream的特性,导致,Spark Streaming不是真正的流式计算

    2.重点算子讲解
    (1)updateStateByKey
    默认情况下,Spark Streaming不记录之前的状态,每次发数据,都会从0开始
    现在使用本算子,实现累加操作。
    (2)transform

    3.窗口操作

    • 窗口:对落在窗口内的数据进行处理,也是一个DStream,RDD
    • 举例:每10秒钟把过去30秒的数据采集过来
    • 注意:先启动nc 再启动程序 local[2]

    4.集成Spark SQL : 使用SQL语句来处理流式数据
    5.缓存和持久化:和RDD一样
    6.支持检查点:和RDD一样

    四.数据源

    Spark Streaming是一个流式计算引擎,就需要从外部数据源来接收数据
    1.基本的数据源

    • 文件流:监控文件系统的变化,如果文件有增加,读取文件中的内容
      希望Spark Streaming监控一个文件夹,如果有变化,则把变化采集过来
    • RDD队列流:可以从队列中获取数据
    • 套接字流:socketTextStream

    2.高级数据源
    (1)Flume
    (2)Spark SQL 对接flume有多种方式:

    • push方式:flume将数据推送给Spark Streaming
    • custom sink 模式:比第一种有更好的健壮性和容错性。使用这种方式,flume配置一个sink。
    • 使用官方提供的spark sink组件
      需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷贝到flume lib下
      需要把 spark-streaming-flume-sink_2.10-2.1.0.jar 拷贝到IDE的lib下添加到build path中
    (3)Kafka
        在讲Kafka时,举例。
    

    四.性能优化的参数

    (1)性能优化:
    spark submit的时候,程序报OOM错误
    程序跑的很慢
    (2)方法:调整spark参数
    conf.set...

    性能调优

    一.Spark 性能优化概览:

    • Spark的计算本质是,分布式计算。
    • 所以,Spark程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者内存。
    • CPU、网络带宽,是运维来维护的。
    • 聚焦点:内存。
    • 如果内存能够容纳下所有的数据,那就不需要调优了。
    • 如果内存比较紧张,不足以放下所有数据(10亿量级---500G),需要对内存的使用进行性能优化。
    • 比如:使用某些方法减少内存的消耗。

    二.Spark性能优化,主要针对在内存的使用调优。

    三.Spark性能优化的技术:

    1.使用高性能序列化类库
    2.优化数据结构
    3.对于多次使用的RDD进行持久化、checkpoint
    4.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化
    5.Java虚拟机垃圾回收调优
    6.Shuffle调优,1.x版本中,90%的性能问题,都是由于Shuffle导致的。

    四.其他性能优化:

    1.提高并行度
    2.广播共享数据
    等等。。。

    五.诊断Spark内存使用:首先要看到内存使用情况,才能进行针对性的优化。

    1.内存花费:
    (1)每个Java对象,都有一个对象头,占用16字节,包含一些对象的元信息,比如指向他的类的指针。

    • 如果对象本身很小,比如int,但是他的对象头比对象自己还大。

    (2)Java的String对象,会比他内存的原始数据,多出40个字节。

    • String内部使用的char数组来保存内部的字符串序列,并且还要保存诸如输出长度之类的信息。
    • char使用的是UTF-16编码,每个字符会占2个字节。比如,包含10个字符的String,2*10+40=60字节
      (3)Java中的集合类型,比如HashMap和LinkedList,内部使用链表数据结构。
    • 链表中的每个数据,使用Entry对象包装。
    • Entry对象,不光有对象头,还有指向下一个Entry的指针,占用8字节。

    (4)元素类型为原始数据类型(int),内部通常会使用原始数据类型的包装类型(Integer)来存储元素。
    2.如何判断Spark程序消耗内存情况?:答案是预估
    (1)设置RDD的并行度。

    • 两种方法创建RDD,parallelize() textFile() 在这两个方法中,传入第二个参数,设置RDD的partition数量。
    • 在SparkConfig中设置一个参数:
    • spark.default.parallelism
    • 可以统一设置这个application中所有RDD的partition数量

    (2)将RDD缓存 cache()

    (3)观察日志:driver日志

    /usr/local/spark-2.1.0-bin-hadoop2.7/work
                19/04/13 22:01:05 INFO MemoryStore: Block rdd_3_1 stored as values in memory (estimated size 26.0 MB, free 339.9 MB)
                19/04/13 22:01:06 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size 26.7 MB, free 313.2 MB)
    

    (4)将这个内存信息相加,就是RDD内存占用量。

    六.使用高性能序列化类库

    1.数据序列化概述

    • 数据序列化,就是将对象或者数据结构,转换成特定的格式,使其可在网络中传输,或存储在内存或文件中。

    • 反序列化,是相反的操作,将对象从序列化数据中还原出来。

    • 序列化后的数据格式,可以是二进制,xml,Json等任何格式。

    • 对象、数据序列化的重点在于数据的交换与传输。

    • 在任何分布式系统中,序列化都是扮演着一个重要的角色。

    • 如果使用的序列化技术,操作很慢,或者序列化后的数据量还是很大,会让分布式系统应用程序性能下降很多。

    • 所以,Spark性能优化的第一步,就是进行序列化的性能优化。

    • Spark自身默认会在一些地方对数据进行序列化,比如Shuffle。另外,我们使用了外部数据(自定义类型),也要让其课序列化。

    • Spark本身对序列化的便捷性和性能进行了取舍

    • 默认情况下:Spark倾向于序列化的便捷性,使用了Java自身提供的序列化机制,很方便使用。

    • 但是,Java序列化机制性能不高,序列化速度慢,序列化后数据较大,比较占用内存空间。

    2.kryo

    • Spark支持使用kryo类库来进行序列化。
    • 速度快,占用空间更小,比Java序列化数据占用空间小10倍。
      3.如何使用kryo序列化机制
      (1)设置Spark conf
    bin/spark-submit will also read configuration options from conf/spark-defaults.conf, 
    in which each line consists of a key and a value separated by whitespace. For example:
    
    spark.master            spark://5.6.7.8:7077
    spark.executor.memory   4g
    spark.eventLog.enabled  true
    spark.serializer   org.apache.spark.serializer.KryoSerializer
    

    (2)使用kryo是,要求需要序列化的类,要提前注册,以获得高性能

    4.kryo类库的优化
    (1)优化缓存大小

    • 如果注册的自定义类型,本身特别大(100个字段),会导致要序列化的对象太大。此时需要对kyro本身进行优化。因为kryo内部的缓存,可能不能存放这么大的class对象。
    spark.kryoserializer.buffer.max  //设置这个参数,将其调大。
    

    (2)预先注册自定义类型

    • 虽然不注册自定义类型,kryo也可以正常工作,但会保存一份他的全限定类名,耗费内存。
    • 推荐预先注册要序列化的自定义类型。

    七.优化数据结构

    1.概述

    • 要减少内存的消耗,除了使用高效的序列化类库外,还要优化数据结构。
    • 避免Java语法特性中所导致的额外内存开销。
    • 核心:优化算子函数内部使用到的局部数据或算子函数外部的数据。
    • 目的:减少对内存的消耗和占用。

    2.如何做?
    (1)优先使用数组以及字符串,而不是集合类。即:优先使用Array,而不是ArrayList、LinkedList、HashMap

    • 使用int[] 会比List<Integer> 节省内存
      (2)将对象转换成字符串。
    • 企业中,将HashMap、List这种数据,统一用String拼接成特殊格式的字符串
    Map<Integer,Person> persons = new HashMap<Integer,Person>()
    
    可以优化为:
    
    "id:name,address"
    String persons = "1:Andy,Beijing|2:Tom,Tianjin...."
    

    (3)避免使用多层嵌套对象结构
    (4)对于能够避免的场景,尽量使用int代替String

    • 虽然String比List效率高,但int类型占用更少内存,比如:数据库主键,id,推荐使用自增的id,而不是uuid

    八.rdd.cache checkpoint

    九.持久化级别:MEMORY_ONLY ---> MEMORY_ONLY_SER 序列化

    十.Java虚拟机的调优

    1.概述

    • 如果在持久化RDD的时候,持久化了大量的数据,那么Java虚拟机的垃圾回收就可能成为一个瓶颈
    • Java虚拟机会定期进行垃圾回收,此时会追踪所有Java对象,并且在垃圾回收时,找到那些已经不再使用的对象。
    • 清理旧对象,给新对象腾出空间。
    • 垃圾回收的性能开销,是与内存中的对象数量成正比。
    • 在做Java虚拟机调优之前,必须先做好上面的调优工作,这样才有意义。
    • 必须注意顺序

    2.Spark GC原理
    见图片

    3.监测垃圾回收

    • 我们可以进行监测,比如多久进行一次垃圾回收以及耗费的时间等等。
    spark-submit脚本中,添加一个配置
    --conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimesStamps"
    
    注意:这个是输出到worker日志中,而不是driver日志。
    /usr/local/spark-2.1.0-bin-hadoop2.7/logs  worker日志
    /usr/local/spark-2.1.0-bin-hadoop2.7/work  driver日志
    

    4.优化Executor内存比例
    (1)目的:减少GC次数。

    • 对于GC调优来说,最重要的就是调节,RDD的缓存占用的内存空间 与 算子执行时创建对象所占用的内存空间 的比例
    • 对于默认情况,Spark使用每个Executor 60% 的内存空间来缓存RDD,在task运行期间所创建的对象,只有40%内存空间来存放。
    //使用:
    conf.set("spark.storage.memoryFraction",0.5)
    

    5.Java GC 调优 (-)

    十一.shuffle原理

    1.优化前
    图片
    2.优化后
    图片

    十二.其他调优

    1.提高并行度
    2.广播共享数据

    Spark Mllib:MLlib 是 Spark 可以扩展的机器学习库。

    一.MLlib概述

    MLlib 是 Spark 可以扩展的机器学习库。
    Spark在机器学习方面具有得天独厚的有事,有以下几个原因:
    1.机器学习算法一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止。

        double wucha = 1.0
        while(wucha>=0.00001){
            建模  wucha -= 某个值
        }
        
        模型计算完毕
        
        当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销。
        而Spark基于内存的计算模型天生擅长迭代计算。只有在必要时,才会读写硬盘。
        所以Spark是机器学习比较理想的平台。
    

    2.通信,Hadoop的MapReduce计算框架,通过heartbeat方式来进行通信和传递数据,执行速度慢。

    • spark 有高效的 Akka 和 Netty 的通信系统,通行效率高。
    • SPark MLlib 是Spark 对常用的机器学习算法的实现库,同时包括相关测试和数据生成器。

    二.什么是机器学习?

    1.机器学习的定义。
    A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
    if its performance at tasks in T, as measured by P, improves with experience E。

    2.三个关键词:算法、经验、模型评价
    在数据的基础上,通过算法构建出模型,并进行评价
    如果达到要求,则用该模型测试其他数据
    如果不达到要求,要调整算法来重新建立模型,再次进行评估
    循环往复,知道获得满意的经验

    3.应用:金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等

    4.基于大数据的机器学习
    (1)传统的机器学习算法,由于技术和单机存储的现值,只能在少量数据上使用。即,依赖于数据抽样。
    (2)传统的机器学习存在的问题:很难做好随机,导致学习的模型不准确。
    (3)在大数据上进行机器学习,直接处理全量数据并进行大量迭代计算。
    (4)Spark本身计算优势,适合机器学习。
    (5)另外 spark-shell pyspark 都可以提供及时查询工具

    5.MLlib

    • MLlib是Spark机器学习库,简化机器学习的工程实践工作,方便扩展到更大规模。

    • 集成了通用的学习算法:分类、回归、聚类、协同过滤、降维等等

    • 另外,MLlib本身在Spark中,数据清洗、SQL、建模放在一起。

    三、线性回归

    四、余弦相似性
    https://blog.csdn.net/u012160689/article/details/15341303

    Spark Graphx

    一.Spark Graphx 是什么?

    1.是Spark 的一个模块,主要用于进行以图为核心的计算,还有分布式图计算
    2.Graphx 底层基于RDD计算,和RDD共用一种存储形态。在展示形态上,可以用数据集来表示,也可以用图来表示。

    二.Spark GraphX 有哪些抽象?

    1.顶点
    RDD[(VertexId,VD)]表示
    VertexId 代表了顶点的ID,是Long类型
    VD 是顶点的属性,可以是任何类型
    2.边
    RDD[Edge[ED]]表示
    Edge表示一个边
    包含一个ED类型参数来设定属性
    另外,边还包含了源顶点ID和目标顶点ID

    3.三元组
    三元组结构用RDD[EdgeTriplet[VD,ED]]表示
    三元组包含一个边、边的属性、源顶点ID、源顶点属性、目标顶点ID、目标顶点属性。

    4.图
    Graph表示,通过顶点和边来构建。

    相关文章

      网友评论

        本文标题:(十五)大数据学习之Spark

        本文链接:https://www.haomeiwen.com/subject/rfkkwctx.html