美文网首页
pyspark_2_入门篇(编写我们的第一个程序WordCoun

pyspark_2_入门篇(编写我们的第一个程序WordCoun

作者: NikolasNull | 来源:发表于2019-12-02 23:01 被阅读0次

    跟着Leo学习PySpark

    chapter2——编写我们的第一个程序WordCount

    上一章我们大致讲了一下pyspark的基本理论和重要概念,如果想系统化且更深入地理解spark中的概念,还请移步官方文档,这一章,将用一个我们耳熟能详的WordCount小例子,零距离感受下pyspark的简单使用

    from pyspark import SparkContext, SparkConf
    
    # 编写Spark程序做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。
    # 要创建SparkContext,首先需要创建一个SparkConf对象,该对象包含有关您的应用程序的信息。
    
    # conf = SparkConf().setAppName(appName).setMaster(master)
    # sc = SparkContext(conf=conf)
    
    conf = SparkConf().setAppName("leo-study-spark").setMaster("local")
    sc = SparkContext(conf=conf)
    
    # 1. appName参数是您的应用程序在群集UI上显示的名称。
    # 2. master是一个Spark,Mesos或YARN群集URL,或一个特殊的“local”字符串,以本地模式运行。
    # 3. 对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark
    
    # spark程序围绕RDD的概念展开,创建RDD的方式有两种:并行化驱动程序中的现有集合,或外部存储系统
    
    # 1. 所谓并行化驱动程序中的现有集合,说白了,就类似于本地的一个数组变量
    # 2. 外部存储系统可以是本地文件系统、HDFS
    
    # 方式一:从本地数组变量中创建一个RDD
    
    data = [1, 2, 3, 4, 5]
    rdd = sc.parallelize(data)
    print type(rdd)
    
    # rdd被创建后,就可以并行化处理,列如我们可以调用map做一步转换操作,
    # 然后调用reduce聚合计算我们的数据集,最后使用print打印输出。
    result = rdd.map(lambda x: x+1).reduce(lambda a, b: a + b)
    print result
    
    # 可以看到输出的结果是20
    # 程序首先进行了一个map转换操作,即对数据集中的每一个元素都加上1
    # 其次,又对这个RDD进行了reduce的累加操作,最后输出元素累加后的结果
    
    <class 'pyspark.rdd.RDD'>
    20
    
    # 方式二:从本地文件系统中创建一个RDD,并演示我们今天的第一个入门级小程序,WordCount
    # test.txt 文本内容如下:
    # I love china
    # china is my home
    # I love yyf
    # yyf is a beautiful girl
    
    file_path = "/Users/mac/software/conda-demo/test_data/test.txt"
    
    # 从文件系统中创建RDD,调用sc.textFile(filePath)
    
    rdd = sc.textFile(file_path)
    
    # textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz")
    
    print 'textFile 将文件映射成RDD,RDD中的每一个元素是文件中的一行内容'
    print rdd.collect()
    print '--------------------------------------------------'
    
    # flatMap 其实是先做了map操作,然后在此基础上又做了一层合并操作,大家可以看到
    
    rdd = rdd.flatMap(lambda x: x.split(" "))
    
    print rdd.collect()
    print 'flatMap 先做了map操作,把RDD每一行内容按空格分隔,映射成为一个字符串数组,再做了合并操作'
    print '--------------------------------------------------'
    
    rdd = rdd.map(lambda x:(x, 1))
    
    print rdd.collect()
    print "map 操作对RDD数据集中的每一个单词进行计数"
    print '--------------------------------------------------'
    
    rdd = rdd.reduceByKey(lambda x, y: x + y)
    print 'reduceByKey 对数据集中每一个元组结构的第一个元素,分组后进行累加计算,统计出文章中每个单词出现的频次,然后把结果输出'
    
    print rdd.collect()
    
    
    textFile 将文件映射成RDD,RDD中的每一个元素是文件中的一行内容
    [u'I love china', u'china is my home', u'I love yyf', u'yyf is a beautiful girl']
    --------------------------------------------------
    [u'I', u'love', u'china', u'china', u'is', u'my', u'home', u'I', u'love', u'yyf', u'yyf', u'is', u'a', u'beautiful', u'girl']
    flatMap 先做了map操作,把RDD每一行内容按空格分隔,映射成为一个字符串数组,再做了合并操作
    --------------------------------------------------
    [(u'I', 1), (u'love', 1), (u'china', 1), (u'china', 1), (u'is', 1), (u'my', 1), (u'home', 1), (u'I', 1), (u'love', 1), (u'yyf', 1), (u'yyf', 1), (u'is', 1), (u'a', 1), (u'beautiful', 1), (u'girl', 1)]
    map 操作对RDD数据集中的每一个单词进行计数
    --------------------------------------------------
    reduceByKey 对数据集中每一个元组结构的第一个元素,分组后进行累加计算,统计出文章中每个单词出现的频次,然后把结果输出
    [(u'a', 1), (u'beautiful', 1), (u'love', 2), (u'I', 2), (u'is', 2), (u'yyf', 2), (u'china', 2), (u'home', 1), (u'girl', 1), (u'my', 1)]
    
    # 同样演示基于本地集合的WordCount程序,依旧是同样的输入与输出
    data = ["I love china",
    "china is my home",
    "I love yyf",
    "yyf is a beautiful girl"]
    
    rdd = sc.parallelize(data)
    print rdd.collect()
    
    rdd = rdd.flatMap(lambda x: x.split(' '))
    
    print rdd.collect()
    
    rdd = rdd.map(lambda x: (x, 1))
    
    print rdd.collect()
    
    rdd = rdd.reduceByKey(lambda x, y: x + y)
    
    print rdd.collect()
    
    ['I love china', 'china is my home', 'I love yyf', 'yyf is a beautiful girl']
    ['I', 'love', 'china', 'china', 'is', 'my', 'home', 'I', 'love', 'yyf', 'yyf', 'is', 'a', 'beautiful', 'girl']
    [('I', 1), ('love', 1), ('china', 1), ('china', 1), ('is', 1), ('my', 1), ('home', 1), ('I', 1), ('love', 1), ('yyf', 1), ('yyf', 1), ('is', 1), ('a', 1), ('beautiful', 1), ('girl', 1)]
    [('a', 1), ('beautiful', 1), ('love', 2), ('I', 2), ('is', 2), ('yyf', 2), ('china', 2), ('home', 1), ('girl', 1), ('my', 1)]
    
    #  除了上述方式,还有很多创建RDD的方式,从HDFS文件系统中创建RDD,只用把file_path路径换成我们的HDFS路径即可
    #到这里,我们可以知晓,spark程序说白了就是对一个超大数据集(一台机器跑不动),转换成可并行计算的RDD数据集,然后被分发到不同的计算节点
    #去执行,经过一系列的转换操作,最终触发Action操作,要么把计算结果收集,要么输出到其他存储介质中
    
    #  显式地声明SparkContext的做法,在目前版本的Spark(2.x)中是不被推荐的,pache Spark 2.0引入了SparkSession,
    #为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,
    #它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互,而且,在之后的课程中也将采用这样的方式去声明spark操作对象。
    #  
    #  下面将演示通过SparkSession来实现同样的WordCount功能,而不需要显式地创建SparkConf,SparkContext,
    #因为这些对象已经封装在SparkSession中。
    #
    #  使用生成器的设计模式(builder design pattern),如果我们没有创建SparkSession对象,
    #则会实例化出一个新的SparkSession对象及其相关的上下文。
    #  
    #  一样的输入与输出,大家可以自己尝试一步一步调试,再次感受spark程序的运行规律。
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("leo-study-spark").getOrCreate()
    
    rdd = spark.read.text('/Users/mac/software/conda-demo/test_data/test.txt').rdd.map(lambda x: x[0])
    rdd = rdd.flatMap(lambda x: x.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
    
    for (word, count) in rdd.collect():
        print "%s: %d" %(word, count)
        
    spark.stop()
    
    a: 1
    beautiful: 1
    love: 2
    I: 2
    is: 2
    yyf: 2
    china: 2
    home: 1
    girl: 1
    my: 1
    

    相关文章

      网友评论

          本文标题:pyspark_2_入门篇(编写我们的第一个程序WordCoun

          本文链接:https://www.haomeiwen.com/subject/ibjqgctx.html