集群内存计算平台。
0 java环境准备
jdk1.8
rpm -ivh jdk-8u92-linux-x64.rpm
cat >/etc/profile.d/java.sh<<EOF
export JAVA_HOME=/usr/java/jdk1.8.0_121**
export CLASSPATH=.:\$JAVA_HOME/jre/lib/rt.jar:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
export PATH=\$PATH:\$JAVA_HOME/bin
EOF
source /etc/profile.d/java.sh
python3.7
安装 ...略...
1 spark安装
下载
cd /usr/local
wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
解压
tar -xf spark-***-bin-hadoop**.tgz
设置环境变量
vi /etc/profile
#SET SPARK_HOME
export SPARK_HOME=/usr/local/spark-2.4.0-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
source /etc/profile
修改spark日志级别(可选)
cd $SPARK_HOME/conf
cp log4j.properties.template log4j.properties
vim log4j.properties
将log4j.rootCategory=INFO, console
改为log4j.rootCategory=WARN, console
2 运行
2.1 运行pyspark
cd /usr/local/spark-2.4.0-bin-hadoop2.7
./bin/pyspark

2.2 直接从python运行pyspark
vi /etc/profile
# python can call pyspark directly
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/pyt
hon/lib/py4j-0.10.7-src.zip:$PYTHONPATH

Ctrl+D 退出shell。
2.3 简单的例子
2.3.1 python统计文件行数
Using Python version 3.7.6 (default, Jan 23 2020 15:01:43)
SparkSession available as 'spark'.
>>> lines = sc.textFile("README.md")
>>> lines.count()
105
>>> lines.first()
'# Apache Spark'
>>>
每个spark应用都是由一个驱动程序(driver program,包含应用的main函数)来发起集群上的各种并行操作。spark shell本身也是一个驱动程序。
驱动程序通过一个SparkContext对象来访问spark,代表了对计算集群的一个连接。shell里的sc变量如下所示:
>>> sc
<SparkContext master=local[*] appName=PySparkShell>
sc.textFile
创建RDD。驱动器一般管理多个执行器(executor)节点,比如进行count()
操作,不同的节点会统计文件的不同部分的行数。上面例子为单机模式,单个节点运行。
另外,有很多用于传递函数的API,可以将操作运行在集群上。例如统计上面README.md里含有“Python”关键字的行。
>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line:"Python" in line)
File "<stdin>", line 1
pythonLines = lines.filter(lambda line:"Python" in line)
^
IndentationError: unexpected indent
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>> pythonLines.count()
3
>>> pythonLines.first()
'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
2.3.2 python统计单词数
spark自带的实例。
./bin/spark-submit ./examples/src/main/python/wordcount.py "test.txt"
[root@iz2ze2rwdd1fpvc1pfe3wtz spark-2.4.0-bin-hadoop2.7]# cat test.txt
python
python
python
python
python
#This file is tested for wordscount.py
python
python
[root@local spark-2.4.0-bin-hadoop2.7]# ./bin/spark-submit ./examples/src/main/python/wordcount.py "test.txt"
20/09/10 14:27:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
python: 7
#This: 1
file: 1
is: 1
tested: 1
for: 1
: 1
wordscount.py: 1
2.3.3 map求list各元素平方
>>> nums = sc.parallelize([1,2,3,4])
>>> squared = nums.map(lambda x: x*x).collect()
>>> print(squared)
[1, 4, 9, 16]
- 读取外部数据集
textFile()
parallelize()
-
makeRDD()
当调用parallelize()
方法的时候,不指定分区数时,用系统给出的分区数;而调用makeRDD()
方法时,会为每个集合对象创建最佳分区,这对后续的调用优化有帮助。
collect()
不要轻易用collect,集群中用了collect产生的数组据说是保存在主节点中,其他节点访问不到,所以还要用到广播变量广播道其他节点,不然就会报空指针的错误。
3 RDD
弹性分布式数据集RDD(Resilient distributed dataset)。
pythonLines.persist()
将RDD持久化到内存中
- 转化操作 (filter,map,flatmap-切词用)
- 伪集合操作
RDD1.distict()
——去重,变集合
RDD1.union(RDD2)
——并集
RDD1.intersection(RDD2)
——交集
RDD1.subtract(RDD2)
——减
RDD1.cartesion(RDD2)
——笛卡尔积 - 行动操作
4 Pair RDD
用可控分区的方式把常被一起访问的数据放在同一个节点上,可以大大减少应用的通信开销。
分布式数据集选择分区方式
Vs本地数据集选择数据机构
- 在python中使用第一个单词作为key创建出一个pair RDD
pair = lines.map(lambda x: (x.split("")[0],x))
4.1 初始化pair RDD
>>> list = ["Hadoop","Spark","Hive","Spark"]
>>> type(list)
<class 'list'>
>>> rdd = sc.parallelize(list)
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
>>> pairRDD = rdd.map(lambda word : (word,1))
>>> type(pairRDD)
<class 'pyspark.rdd.PipelinedRDD'>
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
或者pairRDD1 = sc.parallelize([('Hadoop',1),('Spark',1),('Hive',1),('Spark',1)])
4.2 键值对转化操作
常用的键值对转化操作:reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等。
合并具有相同键的值
>>> pairRDD.reduceByKey(lambda x,y : x+y).foreach(print)
('Hadoop', 1)
('Spark', 2)
('Hive', 1)
>>>
group分组
>>> pairRDD.groupByKey().foreach(print)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f8356da3150>)
('Hive', <pyspark.resultiterable.ResultIterable object at 0x7f8356da30d0>)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f8356da3150>)
>>> PairRDDTmp=pairRDD.groupByKey()
>>> print(PairRDDTmp.collect())
[('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f59634ee150>), ('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f59634ee750>), ('Hive', <pyspark.resultiterable.ResultIterable object at 0x7f59634ee950>)]
keys() & values()
>>> pairRDD.keys()
PythonRDD[46] at RDD at PythonRDD.scala:53
>>> pairRDD.keys().foreach(print)
Spark
Hive
Hadoop
Spark
>>> pairRDD.values().foreach(print)
1
1
1
1
mapValues(func)
对键值对RDD中的每个value都应用一个函数,key不变。
>>> pairRDD.mapValues( lambda x : x+1).foreach(print)
('Hadoop', 2)
('Spark', 2)
('Hive', 2)
('Spark', 2)
join内连接
>>> pairRDD1.foreach(print)
('Hive', 1)
('Spark', 1)
('Spark', 1)
('Hadoop', 1)
>>> pairRDD2.foreach(print)
('Spark', 2)
('Hive', 1)
('Hadoop', 1)
>>> pairRDD1.join(pairRDD2)
PythonRDD[69] at RDD at PythonRDD.scala:53
>>> pairRDD1.join(pairRDD2).foreach(print)
('Hadoop', (1, 1))
('Spark', (1, 2))
('Spark', (1, 2))
('Hive', (1, 1))
统计均值
思路:先想办法计数,可以添加一个字段。然后再对次数和value分别求和,最后相除即可。
>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
>>> rdd.mapValues(lambda x : (x,1)).foreach(print)
('hadoop', (4, 1))
('spark', (6, 1))
('hadoop', (6, 1))
('spark', (2, 1))
>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1]))
PythonRDD[84] at RDD at PythonRDD.scala:53
>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).foreach(print)
('hadoop', (10, 2))
('spark', (8, 2))
>>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).mapValues(lambda x : (x[0] / x[1])).collect()
[('hadoop', 5.0), ('spark', 4.0)]
网友评论