Spark SQL主要用作离线海量数据分析
Spark Streaming主要用作对数据实时处理
Spark的工作原理与MapReduce是如出一辙的,区别在于MapReduce是在HDFS上做计算,而Spark是在内存中做计算,这就形成了Spark这一计算引擎的优势——效率速度快。
一、RDD简介
RDD,(Resilient Distributed Datasets)
,即 弹性分布式数据集,是一个容错、并行的数据结构,也就是分布式的元素集合,在代码中RDD是一个抽象类,是不可变、可分区、里面的元素可并行计算的数据集合。每个RDD被分为多个分区,每个分区运行在集群的不同节点中。通过RDD的依赖关系形成Spark的调度顺序,其实所谓的Spark程序,就是一组RDD的操作,相当于在内存中跑MapReduce。
在Spark中对数据的所有操作不外乎创建RDD、转化已有RDD或者调用RDD操作进行计算,Spark会自动将RDD中的数据分发到集群上并将操作并行化执行。
二、RDD操作
RDD操作也称为算子,分为转换算子
和行动算子
。算子就是一系列的方法操作
,把问题转换到可解决的状态,这就称为spark中的算子。只要是转换算子,必然就会产生新的RDD,转换算子是封装这计算逻辑,只有用到了行动算子的时候才会去读取数据。即:Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
三、RDD闭包检测
就是算子以外的代码都是在Driver端执行,而算子以内的代码都是在Executor端执行的,算子内的代码用到算子外的代码就会形成了闭包效果,如果算子外的数据无法序列化,就无法传值给Executor端执行从而报错,所以runJob前会先检测闭包内的对象是否可进行序列化,这就称为闭包检测。
四、RDD依赖
从一个RDD转换产生一个新的RDD,这相邻的2个RDD之间就会建立依赖关系,这个依赖关系是会保留下来的,因为Spark的RDD不会保留中间数据,所以为了容错性通过这种将依赖关系保留下来的操作来避免执行RDD算子失败后的重新读取计算。相邻2个RDD的直接关系则为依赖
,间接产生依赖的RDD则称为血缘
。
宽依赖:每个父RDD的分区可以被多个子RDD的分区使用称为宽依赖
窄依赖:每个父RDD的分区最多只能被一个子RDD的一个分区使用称为窄依赖
rdd.toDebugString //查看血缘关系
五、RDD任务划分
RDD任务切分为:Application、Job、Stage、Task
Application:初始化一个Spark上下文环境对象SparkContext即生成一个Application
Job:一个行动算子就会生成一个Job
Stage:Stage个数等于宽依赖(ShuffleDependency)个数+1
Task:一个Stage阶段,最后一个RDD的分区个数就是Task的个数
即 1 Application -> n Job -> n Stage -> n Task
六、RDD持久化
RDD是不存储数据的,而当一个RDD要重复使用时,那么则需要从头开始获取数据,也就相当于RDD对象是可以重用,而RDD数据则不可重用。所以可以想办法把一个RDD的数据给到另一个RDD前先把数据放到缓存中,然后后面的多个RDD再从缓存取即可实现持久化。 image.pngrdd.cache() //cache默认持久化操作是把数据持久化到内存中,如果保存到磁盘则使用persist方法选择存储级别
rdd.persist(存储级别) //其实cache方法底层也是调用的persist方法
持久化操作是在行动算子执行时完成的,别忘记了,只有当行动算子执行了才有数据。这种持久化场景不一定说是要用在对RDD的重用上,当然还可以用在一些比较耗时的和数据重要的场景,先把这些RDD数据给持久化下来,方便后续使用和数据安全。
sc.setCheckpointDir("hdfs路径") //创建checkpoint保存路径
rdd.checkpoint() //落盘数据到文件,当作业执行完毕后,不会被删除
cache()、persist()、checkpoint区别
cache:是将数据临时保存在内存中进行重用,但这样数据可能不安全,比如内存溢出和丢失
persist:是将数据临时保存到磁盘中进行重用,数据安全,但涉及磁盘IO,效率较低
checkpoint:是将数据长久落盘到文件中进行重用,数据安全,但会独立执行作业,则效率更低,checkpoint等同于改变了数据源了,会切断血缘关系
为了提高checkpoint效率,可以搭配cache一起使用
rdd.cache()
rdd.checkpoint()
附、WordCount程序例子实战
-
1、WordCount程序思路,如下2个文件a.txt和b.txt
image.png
-
2、使用IDEA开发一个WordCount程序,本地部署运行(代码逻辑对应上图)
先按这篇文章配置好用maven创建scala开发环境,https://www.cnblogs.com/xxbbtt/p/8143441.html
完成后接着创建一个maven项目,在pom.xml文件里添加依赖和插件,如下,下面的plugin插件可以不通过add support那种方式引入scala,从而自动可创建scala.class文件。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.meizu.xiaojiang.WordCount</groupId>
<artifactId>WordCount</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
image.png
经上面程序编写,且在项目里的input文件夹里新建好test.txt文件, 之后程序运行是会出现一个报错的:java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
,但这是不影响spark程序运行,因为本地是没有安装hadoop或者spark的部署环境,所以程序运行才会报错。可参考这篇文章解决:https://www.cnblogs.com/mdlcw/p/11106218.html。
程序结果.png
不过一般做法是window系统只负责写程序,然后程序是打包到linux集群上去运行的,集群上面才有环境,而且毕竟大数据的存储也是在集群中,如果需要调试程序的话也直接去集群上调试即可。
-
3、现在我就演示下如何把开发好的程序部署到集群上面去运行,在这里默认已经安装配置好hadoop集群和spark集群了,如果没搞定这些的话就先跳转到我这2篇文章去安装配置好集群。
1、Hadoop学习(一)入门与集群搭建
2、Spark学习(一)入门与集群搭建
要把spark这个计算框架与yarn进行整合的话,就要修改spark的spark-env.sh配置文件,在里面加上这句,是把yarn的运行路径告诉spark,集群里的4台机子都要配置。
YARN_CONF_DIR=/usr/hadoop-2.6.4/etc/hadoop
之后把上面的统计单词数的程序修改下,把读取文件那一行的路径修改为
//读取HDFS中根目录下的test.txt文件,将文件一行一行读取出来
val lines: RDD[String] = sc.textFile("hdfs://master:9000/test.txt")
把WordCount程序打包成jar文件,然后上传到spark安装目录下,再把test.txt文本文件存放到HDFS中的根目录下。
最后切换到spark安装目录下,然后执行spark-sumbit命令,如下就完事了。
\
代表的是换行,--
代表的是附带参数,--name
指的是程序名,--class
指的是类名,用全称。
./bin/spark-submit \
--master yarn-client \
--name WordCount \
--class com.meizu.xiaojiang.WordCount \
--executor-memory 1G \
--total-executor-cores 2 \
/usr/spark-2.4.3/WordCount.jar
image.png
image.png
网友评论