美文网首页
2.11.3PySpark--WordCount

2.11.3PySpark--WordCount

作者: 寒暄_HX | 来源:发表于2020-03-28 15:20 被阅读0次

总目录:https://www.jianshu.com/p/e406a9bc93a9

Python - 子目录:https://www.jianshu.com/p/50b432cb9460

使用命令行

命令行的WordCount

基本思路就是先将输入的集合转换成RDD;然后先对RDD进行map操作,将word变换成(word,1)的形式,其中的数字1代表单词出现了1次;之后进行reduceByKey(func),就是将key值相同的数据进行func操作,此处就是将相同单词的次数相加;最后将得到的结果打印在控制台。


命令行的WC
>>> list = ["zhangsan","lisi","zhangsan","lisi"]   // 定义列表
>>> rdd = sc.parallelize(list)                     // 从列表中创建RDD
>>> rdd0 = rdd.map(lambda word : (word,1))         // map阶段,将单词转换为(key,1)
>>> rdd1 = rdd0.reduceByKey(lambda a,b :a+b)       // reduce阶段,对key进行分组汇总
>>> rdd1.collect()                                 //将RDD类型的数据转化为数组,并且输出
[Stage 0:>                                            
[('zhangsan', 2), ('lisi', 2)]

本地文件的WordCount

文件 WC
>>> textFile = sc.textFile("/usr/local/spark_word/word.txt")
>>> wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
>>> wordCount.collect()
[('apache', 4), ('hadoop', 4), ('spark', 4), ('scala', 4)]

HDFS文件的WordCount

文件 WC
>>> textFile = sc.textFile("hdfs://192.168.110.110:9000/spark/wc/input/one.txt")
>>> wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
>>> wordCount.collect()
[('hello', 3), ('world', 1), ('hadoop', 2), ('spark', 2), ('SQL', 1), ('mapreduce', 1)]

编写py文件

列表

from pyspark import SparkConf, SparkContext
 
# 创建SparkConf和SparkContext
conf = SparkConf().setMaster("local").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)

# 可以简写成  sc = SparkContext('local', 'wordcount')
 
# 输入的数据
data=["zhangsan","lisi","zhangsan","lisi"] 
 
# 将Collection的data转化为spark中的rdd并进行操作
rdd=sc.parallelize(data)
resultRdd = rdd.map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b)
 
# rdd转为collecton并打印
resultColl = resultRdd.collect()
for line in resultColl:
    print(line)
 
# 结束
sc.stop()

----------
输出结果:
('zhangsan', 2)
('lisi', 2)

每个spark 应用(application)包含一个驱动(driver)程序,这个驱动程序运行用户的主函数,并在集群中执行并行化操作。Spark提供的主要抽象是一个弹性分布式数据集(resilientdistributed dataset,RDD),它是集群节点中进行分区的元素集合,可以进行并行操作。
RDDs是通过从Hadoop文件系统(或其他文件系统)中的文件或程序中scala的集合初始化创建,也可以通过其他RDD转换(transform)得到。
用户也可以令spark将RDD持久化(persist)在内存中,这可以使RDD在并行操作中的高效的重用。RDD可以从故障节点中自动恢复。

本地文件

from pyspark import SparkConf, SparkContext
 
# 创建SparkConf和SparkContext
conf = SparkConf().setMaster("local").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
 
# 输入的数据
textFile = sc.textFile("/usr/local/spark_word/word.txt")
 
# 将Collection的data转化为spark中的rdd并进行操作
wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
 
# rdd转为collecton并打印
wordColl =wordCount.collect()
for line in wordColl:
    print(line)
 
# 结束
sc.stop()

----------
输出结果:
('apache', 4)
('hadoop', 4)
('spark', 4)
('scala', 4)

HDFS文件

from pyspark import SparkConf, SparkContext
 
# 创建SparkConf和SparkContext
conf = SparkConf().setMaster("local").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)
 
# 输入的数据
textFile = sc.textFile("hdfs://192.168.110.110:9000/spark/wc/input/one.txt")
 
# 将Collection的data转化为spark中的rdd并进行操作
wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
 
# rdd转为collecton并打印
wordColl =wordCount.collect()
for line in wordColl:
    print(line)
 
# 结束
sc.stop()

----------
输出结果:
('apache', 4)
('hadoop', 4)
('spark', 4)
('scala', 4)

将输出流输出到hdfs中。

from pyspark import SparkContext
 
# 创建SparkConf和SparkContext
conf = SparkConf().setMaster("local").setAppName("lichao-wordcount")
sc = SparkContext(conf=conf)

#制定输入输出路径
inputFile = 'hdfs://192.168.110.110:9000/spark/wc/input/one.txt'        #测试文档
outputFile = 'hdfs://192.168.110.110:9000/spark/wc/output'    #结果目录
 
text_file = sc.textFile(inputFile)
 
counts = text_file.flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
counts.saveAsTextFile(outputFile)

最后来写一下spark如何提交Python文件的作业。
使用的命令(spark-submit命令详解与调优方案

spark-submit --master local[2] --num-executors 2 --executor-memory 1G wordcount1.py 
运行结果

相关文章

网友评论

      本文标题:2.11.3PySpark--WordCount

      本文链接:https://www.haomeiwen.com/subject/jhwjuhtx.html