美文网首页
02-pySpark Hello World

02-pySpark Hello World

作者: 过桥 | 来源:发表于2019-10-31 19:02 被阅读0次

编写测试文件

软件解压目录上层,新建测试文件

[mongodb@mongodb02 spark-2.4.4-bin-hadoop2.7]$ cd ../
[mongodb@mongodb02 software]$ vim helloSpark
[mongodb@mongodb02 software]$ sudo vim helloSpark

hello Spark
hello World
hello Coin !

[mongodb@mongodb02 software]$ 

相关路径如下:

软件解压路径:
/home/spark/software/spark-2.4.4-bin-hadoop2.7/

[mongodb@mongodb02 software]$ ll
总用量 224704
-rw-r--r--.  1 root    root           37 10月 30 16:28 helloSpark
drwxr-xr-x. 13 mongodb mongodb       211 8月  28 05:30 spark-2.4.4-bin-hadoop2.7
-rw-r--r--.  1 root    root    230091034 10月 29 17:10 spark-2.4.4-bin-hadoop2.7.tgz

本地启动测试

[mongodb@mongodb02 spark-2.4.4-bin-hadoop2.7]$ ./bin/pyspark

# 加载文件至 RDDs
>>> lines = sc.textFile("../helloSpark")

# 返回总数
>>> lines.count()
3      

# 返回首行
>>> lines.first()
u'hello Spark'

可能遇到问题,路径异常

>>> lines = sc.textFile("../../helloSpark")
>>> lines.count()

 File "/home/spark/software/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/spark/helloSpark

RDDs 特性

RDDs血统关系图:RDDs间的依赖关系和创建关系
Spark使用血统关系图计算各RDD需求和恢复丢失的数据

延迟计算:在RDDs第一次使用 action 操作是才进行计算(减少数据间传输)
Spark 内部记录metadata表明transformations操作已被响应,加载数据也是延迟计算

默认每次RDDs上进行action操作时,Spark都会重新计算RDDs
如果想重复利用一个RDD,可以使用RDD.persist(),缓存移除unpersist()
缓存级别:MEMORY_ONLY、MEMORY_ONLY_SER、DISK_ONLY、MEMORY_AND_DISK、MEMORY_AND_DISK_SER

RDDs 常用Transformation方法

# filter 过滤,区分大小写,无法检索出匹配项
>>> lines.filter(lambda x:x.count("world") > 0).collect()
[]                                                                         

# filter 过滤,使用 count 判断
>>> lines.filter(lambda x:x.count("World") > 0).collect()
[u'hello World']

# filter 过滤,使用 in 判断
>>> lines.filter(lambda x:"hello" in x).collect()
[u'hello Spark', u'hello World', u'hello Coin !']

# map 转换
>>> hey_lines = lines.map(lambda x:(x,'hey'))
>>> hey_lines.collect()
[(u'hello Spark', 'hey'), (u'hello World', 'hey'), (u'hello Coin !', 'hey')]

# flatMap 转换,拍平
>>> lines = lines.flatMap(lambda x:x.split(" "))
>>> lines.collect()
[u'hello', u'Spark', u'hello', u'World', u'hello', u'Coin', u'!']

# distinct 去重
>>> lines.distinct().collect()
[u'!', u'World', u'Spark', u'Coin', u'hello']  

# union 添加
>>> lines.union(lines).collect()
[u'hello', u'Spark', u'hello', u'World', u'hello', u'Coin', u'!', u'hello', u'Spark', u'hello', u'World', u'hello', u'Coin', u'!']

# 创建临时 RDD,建议仅测试中使用
>>> temp = sc.parallelize(('hello','World','temp'))

# 比较两组 RDD 中重复项 / 交集
>>> lines.intersection(temp).collect()
[u'World', u'hello']  

# 仅在 lines 中存在,temp中不存在 
>>> lines.subtract(temp).collect()
[u'!', u'Spark', u'Coin']

# 仅在 temp 中存在,lines中不存在 
>>> temp.subtract(lines).collect()
['temp']

RDDs 常用Action

# collect 返回RDD所有元素,数据量大时使用 saveAsTextFile()
>>> lines.collect()
[u'hello', u'Spark', u'hello', u'World', u'hello', u'Coin', u'!']

# count 计数
>>> lines.count()
7

# 返回唯一元素出现个数
>>> lines.countByValue()
defaultdict(<type 'int'>, {u'!': 1, u'World': 1, u'Spark': 1, u'Coin': 1, u'hello': 3})

# 返回几个元素
>>> lines.take(2)
[u'hello', u'Spark']
>>> lines.take(7)
[u'hello', u'Spark', u'hello', u'World', u'hello', u'Coin', u'!']

# 根据RDD中数据比较器排序返回
>>> lines.top(2)
[u'hello', u'hello']

# 接收函数处理,返回新元素
>>> temp = sc.parallelize((1,2,3,3))
>>> temp.reduce(lambda x,y: x+y)
9

KeyValue对RDDs

# 加载样例数据
>>> lines = sc.textFile("../helloSpark")
>>> lines.collect()
[u'hello Spark', u'hello World', u'hello Coin !']
# 使用map 建立KeyValue RDD
>>> keyValueRdd = lines.map(lambda line:(line.split(" ")[0],line))
>>> keyValueRdd.collect()
[(u'hello', u'hello Spark'), (u'hello', u'hello World'), (u'hello', u'hello Coin !')]

# 建立数组RDDs
>>> lines = sc.parallelize(((1,2),(2,3),(2,4)))
# 使用reduceByKey将相同key数据相加
>>> lines.reduceByKey(lambda x,y:x+y).collect()
[(1, 2), (2, 7)]

# 使用groupByKey将相同key数据分组
>>> lines.groupByKey().collect()
[(1, <pyspark.resultiterable.ResultIterable object at 0x7f1732719ed0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f1732719f50>)]
>>> lines.groupByKey().mapValues(list).collect()
[(1, [2]), (2, [3, 4])]                                                    

# 注:直接使用groupByKey分组后数据有嵌套,转为 list ,便于输出查看

# 使用mapValues操作每个元素的value
>>> lines.mapValues(lambda x:x+1).collect()
[(1, 3), (2, 4), (2, 5)]

# 获取keys
>>> lines.keys().collect()
[1, 2, 2]

# 获取values
>>> lines.values().collect()
[2, 3, 4]

# 根据key排序
>>> lines.sortByKey().collect()
[(1, 2), (2, 3), (2, 4)]

# 使用combineByKey将相同key数据分组(groupByKey内部调用此方法)
# 参数1、createCombiner, which turns a V into a C (e.g., creates a one-element list)
# 参数2、mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
# 参数3、mergeCombiners, to combine two C’s into a single one.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> def add(a, b): return a + str(b)
... 
>>> sorted(x.combineByKey(str, add, add).collect())
[('a', '11'), ('b', '1')]

>>> def add(a, b): return a + b
... 
>>> sorted(x.combineByKey(int, add, add).collect())
[('a', 2), ('b', 1)]

相关文章

网友评论

      本文标题:02-pySpark Hello World

      本文链接:https://www.haomeiwen.com/subject/yhatbctx.html