美文网首页Linux大数据教程
SparkCore基础(一)

SparkCore基础(一)

作者: Z尽际 | 来源:发表于2017-05-31 17:49 被阅读299次

    * SparkCore基础(一)

    学习Spark,首先要熟悉Scala,当然你说你会Python或者Java能不能玩Spark?能!但是不推荐,首推Scala,因为Scala非常便捷,而且Scala有非常好的交互式编程体验(当然了,在这里,Python也不差)。其次呢,我们要对Hadoop的MapReduce要是有一定的了解。不然,学习起来,是会稍微费点功夫。好,不扯这么多了,相关的故事啊,疑问啊可以评论留言询问或者百度查询,我们现在直接进入正题。

    Spark特征简述

    * Spark是什么

    官方描述:Spark is a fast and general engine for large-scale data processing

    ** Spark是一个快速的,通用的,大数据规模的运算引擎。这是一个非常精准的描述。

    ** Spark是基于MapReducer实现的通用的分布式计算框架,所以它继承了MapReduce的优点,同时还支持将Job运算任务产生的中间结果和最终结果保存在内存中。

    * Spark优势

    ** Spark的中间数据放到内存中,对于迭代运算效率更高

    ** 运算速度奇快

    ** 更灵活的数据操作,比如:map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等等

    * Spark不适合做什么

    ** 不适合做增量变化的应用模型

    * Spark支持语言

    Java、Scala、Python

    * 适用场景讨论

    ** 适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场*合,受益就相对较小。

    Spark下载

    一般情况下,我们使用spark之前,都需要下载源码,然后根据自己的集群环境(也就是Hadoop版本)进行编译,然后再安装使用。

    Spark下载:

    http://spark.apache.org/downloads.html

    打开页面后,做出如下选择,即可开始下载源码

    在这里我们使用1.6.1的源码

    Spark编译

    在此我们简单介绍两种方式:

    ** SBT编译

    这是一个类似Maven的仓库,基于Scala

    ** Maven编译

    命令:

    ** make-distribution.sh编译

    修改源码根目录下的make-distribution.sh文件,修改内容如图:

    依次为:配置Spark版本,Scala版本,Hadoop版本,是否支持Hive,1为支持

    配置镜像:注意,如果编译的是原版,请添加此镜像,如果编译的是CDH版本的,请注意去掉此镜像。

    配置域名解析服务器:

    $ sudo vi /etc/resolv.conf,配置如下:

    nameserver 8.8.8.8

    nameserver 8.8.4.4

    最后执行命令:

    注意要支持yarn和hive

    世界充满爱之编译好的Spark传送门(分别包含包含Apache和CDH版本的):

    链接:http://pan.baidu.com/s/1eRBJtjs 密码:t03u

    Spark运行模式

    ** Local

    即本地模式

    ** Standalone

    即Spark自带的集群模式,分为Master节点和Worker节点,顾名思义,一个管理者,多个干活的。:)

    ** Yarn

    国内相当主流的一种运行部署模式,只是目前Yarn分配的Container是不能够动态伸缩的,后续可能会考虑支持。

    ** Mesos

    Spark在出生的时候就考虑支持该框架,很灵活,但国内使用似乎不多,感兴趣请自行研究之。

    Spark安装部署

    将Spark解压出来,然后到conf目录下,自己将template文件拷贝出文后提到的文件进行配置即可,在之前的章节我们已经提到过很多次,此步骤想必应该非常熟练了,不再赘述了。

    Local模式:

    spark-env.sh 文件配置如下:

    Spark测试案例之Local模式

    在案例开始前,请确保你的HDFS是可用的,并且spark-shell在active的NameNode节点上运行。此刻建议你已经熟知Hadoop中MapReduce的编写过程以及运行原理。

    案例一:基于本地模式的WordCount,words.txt中的内容:

    Step1、进入spark根目录使用$ bin/spark-shell命令启动spark,如下图:

    Step2、读取/input/words.txt文件,尝试检查一下words.txt文件有多少行数据,操作如下:

    scala> val rdd = sc.textFile("/input/words.txt")

    scala> rdd.count

    当然了,统计词频,这个步骤可以省略,在此只是想验证一下自己读取到的数据有没有问题

    好,大家可以看到,有3行数据,每一行都有若干英文单词。那么这里面涉及到几个问题需要拿出来讨论一下:

    1、什么是rdd?

    RDD is a fault-tolerant collection of elements that can be operated on in parallel,RDD是弹性分布式数据集,全称Resilient Distributed Datasets,具有分布式,高容错性等特点,在这里,刚开始接触的话,你可暂且理解为一个集合就可以了,一个数据集合。

    2、什么是sc?

    sc的全称是SparkContext,即Spark的上下文对象,这个理解可以类比于在Hadoop阶段我们在MapReduce中接触到的Context,不管是读取文件还是其他数据操作,都依赖于SparkContext的实例化。在这里,sc即一个实例化好的SparkContext对象。

    我们通过sc.textFile方法读取到HDFS系统中存放的words.txt文件信息,该方法返回一个RDD对象,之后我们通过rdd对象调用count方法,来查看读取到的文件中数据有多少行。

    Step3、利用得到的rdd对象进行数据的拆分,即,每一个单词都拆分成一个RDD对象,比如类似这样的理解:RDD<String> rdd = new RDD("hadoop");那么使用scala在spark中如何做呢?请看:

    scala> val wordRdd = rdd.flatMap(line => line.split(" "))

    然后我们使用wordRdd显示一下第一个单词看一看:

    scala> wordRdd.first

    Step4、将分割出来的每一个单词做Map映射

    scala> val mapRdd = wordRdd.map(word => (word, 1))

    这是scala的高阶函数,注意不理解请重新复习Scala语言。该语句的意思是:将wordRdd中存放的单词映射为一个tuple元组,元组中有两个元素,第一个元素为单词,第二个元素为当前单词本次的个数,固定为1,这个1就像Hadoop阶段中Map的LongWritable一样,这个word就像Text一样。

    Step5、这一步要做的就是讲map映射出来的数据集进行reduce运算

    scala> val reduceRdd = mapRdd.reduceByKey((x, y) => x + y)

    该行代码的意思是将某一个单词的好多个1(当然如果进行Combine操作了,也许可能不是多个1,如果你无法理解我这一句在说什么,请继续前进,然后重新复习Hadoop的MapReduce相关知识点)进行相加运算。

    Step6、查看一下结果

    scala> reduceRdd.collect

    显示出来了,而且执行过程非常的迅速,你懂得。

    当然了,以上的操作,完全可以使用一句话来实现,并且代码的体现形式可以非常骚气,如:

    scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

    Step7、当然了结果也可以输出到HDFS系统当中,比如:

    scala> reduceRdd.saveAsTextFile("/output/spark/test01")

    案例二:基于案例一,进行二次排序,即,将统计出的词频结果按照降序或者升序排列

    sc.textFile("/input/words.txt")

    .flatMap(_.split(" "))

    .map((_, 1))

    .reduceByKey(_ + _)

    .map(x => (x._2, x._1))

    .sortByKey()

    .map(x => (x._2, x._1))

    .collect

    Step1、得到案例一的统计好的词频结果,然后做一个map映射,将单词和单词出现的次数颠倒过来,也就是说,(hadoop, 1)变成(1, hadoop),这样做的原因是因为OrderedRDDFunctions类中有一个方法叫做:sortByKey,意思是按照Key的大小进行排序,默认参数是升序,如图:

    为了使用该方法,我们这么做:

    上一个案例,我们得到:

    val reduce = sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

    然后:

    val reverseRdd = reduce.map(x => (x._2, x._1))

    然后我们看一眼这个RDD集合:

    Step2、直接使用sortByKey进行默认排序

    val sortRdd = reverseRdd.sortByKey()

    Step3、排序结束你不得给人家再反转回来?所以:

    sortRdd.map(x => (x._2, x._1)).collect,如图:

    当然了,以上分解步骤一气呵成最爽快:

    sc.textFile("/input/words.txt")

    .flatMap(_.split(" "))

    .map((_, 1))

    .reduceByKey(_ + _)

    .map(x => (x._2, x._1))

    .sortByKey()

    .map(x => (x._2, x._1))

    .collect

    Step4、当然了,sortByKey方法也可以实现倒序,如:

    sc.textFile("/input/words.txt")

    .flatMap(_.split(" "))

    .map((_, 1))

    .reduceByKey(_ + _)

    .map(x => (x._2, x._1))

    .sortByKey(false)

    .map(x => (x._2, x._1))

    .collect

    Step5、二次排序还可以使用top

    top源码:

    这是一个柯里化的函数,top命令是查看前多少条数据,如图可见,在查看之时,元素也是排序好的

    比如:

    sc.textFile("/input/words.txt")

    .flatMap(_.split(" "))

    .map((_, 1))

    .reduceByKey(_ + _)

    .map(x => (x._2, x._1))

    .top(12)

    输出如图:

    Spark运行模式之Standalone

    配置:spark-env.sh


    Master节点:SPARK_MASTER_IP=z01

    Master节点端口号:SPARK_MASTER_PORT=7077

    Master WebUI端口号:SPARK_MASTER_WEBUI_PORT=8080

    Worker节点可用CPU核心数:SPARK_WORKER_CORES=2

    Worker可用内存:SPARK_WORKER_MEMORY=2g

    Worker端口号:SPARK_WORKER_PORT=7078

    Worker WebUI端口号:SPARK_WORKER_WEBUI_PORT=8081

    允许在每台机器上开启几个Worker进程,默认为1个SPARK_WORKER_INSTANCES=1

    配置:slaves

    即配置允许哪几台机器当做Woker节点

    以上配置完成后,scp到其他集群节点

    启动:

    Master

    $ sbin/start-master.sh

    Worker

    $ sbin/start-slaves.sh

    完成后通过z01:8080端口访问即可如图所示:

    也可以JPS看一下进程:

    在Standalone上运行Spark

    首先,查看一下spark的帮助文档来引导该怎么做:

    $ bin/spark-shell --help

    注意红框内的内容,那么接下来,我们应该知道怎么让spark运行在standalone上了:

    $ bin/spark-shell --master spark://z01:7077

    如图:注意红框内容

    尖叫提示:如果直接不加参数的使用spark-shell方式启动,则还是在本地模式(Local)启动的。

    Spark测试案例之Standalone模式

    案例一:跑一个一气呵成的WordCount

    scala> sc.textFile("/input/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

    WEBUI,http://192.168.122.200:4040/jobs/ 如图:

    可以看到,有一个Job任务已经运行完毕了。

    案例二:做一个每日的PV分析

    Step1、首先,我们将网站的访问数据导入到hive当中,执行:

    $ cat hql-file/track-log.hql

    其中track-log.hql文件如下:

    该部分内容可以参看Hive框架基础(一)

    Step2、通过Hive查看track_log文件在哪

    hive> desc formatted track_log;

    如图:注意红框内容,对于我们来讲,有用的即:/user/hive/warehouse/track_log/2015082818

    Step3、将日志数据读入到RDD中等待分析

    scala> val rdd = sc.textFile("/user/hive/warehouse/track_log/2015082818")

    Step4、清洗无效的数据,即空白行,以及url字段为空的,我们要过滤掉

    1、先过滤空白行

    2、再分割字段值

    3、最后过滤url字段为空的

    综合来写:

    scala> val validRdd = rdd.filter(line => line.length > 0).map(_.split("\t")).filter(arr => arr(1).length > 0)

    当然了,此时你可以count一下,看看过滤后剩下多少数据

    scala> validRdd.count

    Step4、将URL做map映射,比如做出这样的映射:(今日日期, 1)

    那么今日的日期在tracktime字段,属于分割后的数组的第17个索引处

    在hive中我们查看一下该日期的格式:

    hive> select tracktime from track_log limit 1;

    如图:

    那么截取出2015-08-28应该很容易,所以:

    scala> val mapRdd = validRdd.map(arr => (arr(17).substring(0, 10), 1))

    Step5、你懂得,再来一个Reduce即可

    scala> val reduceRdd = mapRdd.reduceByKey(_ + _)

    完事之后可以查看一下结果:

    scala> reduceRdd.collect

    如图:

    当然了也可以一气呵成走你:

    scala> sc.textFile("/user/hive/warehouse/track_log/2015082818")

    .filter(line => line.length > 0)

    .map(_.split("\t"))

    .filter(arr => arr(1).length > 0)

    .map(arr => (arr(17).substring(0, 10), 1))

    .reduceByKey(_ + _)

    .collect

    Step6、我们使用Hive来验证一下

    注意如果你的Yarn没有启动,需要将Hive设置成Local模式:

    hive> set hive.exec.mode.local.auto = true;

    然后执行:

    结果如图:

    对比可知,两个结果是一样的。

    案例三:PV和UV分析

    PV:即页面访问次数

    UV:即不同用户访问页面次数

    Step1、读取网站日志文件生成RDD对象

    scala> val rdd = sc.textFile("/user/hive/warehouse/track_log")

    Step2、过滤不必要的数据,并生成map映射,注意此时的操作与之前的案例略有不同,请注意观察,如图:

    scala> val mapRdd = rdd.filter(_.length > 0).map(line => {

    | val arr = line.split("\t")

    | val date = arr(17).substring(0, 10)

    | val guid = arr(5)

    | val url = arr(1)

    | (date, guid, url)

    | }).filter(tuple => tuple._3.length > 0)

    Step3、可选步骤,此处可以将数据cache到内存中,注意,cache后,不会立刻缓存到内存中,需要执行一个action,比如count,take,collect都可以

    scala> mapRdd.cache

    scala> mapRdd.count

    在此之后就可以在4040端口的页面是storge选项中看到缓存到内存中的数据信息,如图:

    Step4、统计PV

    scala> val pvRdd = mapRdd.map(tuple => (tuple._1, 1)).reduceByKey(_ + _)

    scala>  pvRdd.first,如图:

    Step5、UV统计

    scala> val uvRdd = mapRdd.map(tuple => (tuple._1 + "_" + tuple._2, 1)).distinct.map(tuple => {

    val arr = tuple._1.split("_")

    (arr(0), 1)

    }).reduceByKey(_ + _)

    此时可以自行使用uvRdd.first查看结果,不再展示

    Step6、合并PV和UV的结果进行显示

    union方式:

    scala> val pv_uvRdd = pvRdd.union(uvRdd)

    scala> pv_uvRdd.collect,如图:

    join方式:

    scala> val pv_uvRdd = pvRdd.join(uvRdd)

    scala> pv_uvRdd.first,如图:

    验证:使用Hive或者SparkSQL验证结果一致性

    首先创建SQL语句:

    SparkSQL方式:

    scala> val sql = """ 上边的SQL代码 """,如图:

    然后执行:

    scala> val result = sqlContext.sql(sql)

    scala> result.show()

    尖叫提示:如果你的hive使用了thrift的metastore方式,请把hive的hive-site.xml文件软连接到spark的conf目录下!!否则上述指令将会出现找不到table的错误。

    HIVE方式:直接使用Hive客户端执行上面的SQL语句,如图:

    Spark任务历史服务

    对于Yarn有mr-historyserver

    对于Spark有SparkHistory

    所以应该很容易明白这是一个任务日志的历史服务,比如你可以查看昨天半夜运行的任务情况。

    开启这个服务也很简单:

    可以参看:http://spark.apache.org/docs/1.6.1/monitoring.html

    Step1、配置参数

    配置:spark-env.sh,日志默认是保存在本地的,此刻我们将日志保存到HDFS系统当中如图:

    配置:spark-defaults.conf,spark启动时默认加载的配置文件

    Step2、在HDFS系统中创建目录/user/z/spark-events

    Step3、将配置文件重新scp到其他节点之后,重启服务,然后开启历史服务

    $ sbin/start-history-server.sh

    JPS看一眼:

    然后在浏览器打开:http://z01:18080/

    如图:

    Step4、测试玩一玩?

    $ bin/spark-shell --master spark://z01:7077

    随便执行执行一个我们之前的案例任务,即可,运行几个任务,成功运行几个,再失败几个,如图:

    注意红框内容,如果你当前的spark-shell没有退出,那么该任务就是属于正在运行的任务。请自行切换观察即可。

    * 总结

    对于RDD到RDD的 操作,我们称之为Transformation操作

    例如:我们在案例中使用的过滤,或者map,或者reduce等等

    对RDD到其他类型的操作,我们称之为Action

    例如:我们在案例中使用的top,或者take、collect等操作

    另外RDD中的数据可以持久化到内存中来操作,使用:

    rdd.cache来操作,比较适用于频繁使用的。

    这一节我们大概了解了Spark的操作,也应该更加深刻的熟悉了Scala的操作。下一节我们针对Spark进行更深入的探讨。


    IT全栈公众号:

    QQ大数据技术交流群(广告勿入):476966007


    下一节:SparkCore基础(二)

    相关文章

      网友评论

      • 99648107d929:英语不好怎么看官网呢?官网那么多东西侧重点看什么?这些问题好像很多人都有.
      • 99648107d929:编译源码有什么好处和作用?

      本文标题:SparkCore基础(一)

      本文链接:https://www.haomeiwen.com/subject/mxrkzttx.html