2.1 下载与安装
单机Spark的安装,可以不用Hadoop,所以只需要安装JDK、scala (均可以用sudo apt-get install *)和
spark(http://spark.apache.org/downloads.html)
单机Spark是指在本地模式下运行,也就是非分布式模型,这样我们只需要用到一台机器。
2.2 Spark中Python和Scala的shell
Spark shell和其他的shell不同,在其他的shell工具中你只能使用单机的硬盘和内存来操作数据,而Spark shell可用来与分布式存储在许多机器的内存或者硬盘上的数据进行交互,并且处理过程的分发由Spark自动控制完成。
先进入spark安装文件夹下
1)Python版本的shell打开方式 bin/pyspark
2)Scala版本的shell打开方式 bin/spark-shell
2.3 Spark 核心概念简介
每个Spark应用都由一个驱动程序来发起集群上各种并行操作。驱动器程序包含应用的main函数,并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关操作。
驱动器程序通过一个SparkContext对象来访问Spark,是一个叫做sc的变量,有了sc变量,就可以用它来创建RDD。
Spark分布式执行涉及的组件.png
向Spark传递函数
当你在这些语言中使用Spark时,你也可以单独定义一个函数,然后把函数名传给Spark。
def haspython(line):
return "python" in line
pytonlines = lines.fliter(haspython)
2.4 独立应用
spark-submit 脚本会帮助我们引入Python程序的spark依赖,这个脚本为spark的PythonAPI配置好了运行环境。
bin/spark-submit my_script.py
2.4.1 初始化SparkContext
一旦完成应用与Spark的连接,接下来就需要在你的程序中导入Spark包并且创建SparkContext。可以通过先创建一个SparkConf对象来配置你的应用,然后基于这个SparkConf创建一个SparkContext对象。
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf =conf)
1.集群url,告诉Spark如何连接到集群,这里使用的local,这使得spark运行在单机单线程上而无需连接到集群
2.应用名,当连接到一个集群上,这个值可以帮助你在急群管理器的用户界面中找到你的应用。
——关闭Spark可以调用stop()方法。
2.4.2 构建独立应用
一个Python版本的单词统计例子
from pyspark import SparkContext
sc = SparkContext(master='local[1]', appName='wordcount')
# 读取文件
text = sc.textFile('/****/***/spark-2.4.0-bin-hadoop2.7/README.md')
# 切分单词
counts = text.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
print(counts.collect())
【注释】py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/spark/README.md
这是因为在使用相对路径时,系统默认是从hdfs://localhost:9000/目录下读取README.md文件的,但是README.md文件并不在这一目录下,所以sc.textFile()必须使用绝对路径,
网友评论